-
Notifications
You must be signed in to change notification settings - Fork 37
Complete autonomous landing module #262
Conversation
modules/auto_landing/auto_landing.py
Outdated
| self.__logger = local_logger | ||
| self.__selection_strategy = selection_strategy | ||
|
|
||
| def _select_detection(self, detections: list) -> Optional[int]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think you don't have to include the typing import and can just specify return type without it
modules/auto_landing/auto_landing.py
Outdated
| NEAREST_TO_CENTER = "nearest_to_center" # Choose detection closest to image center | ||
| LARGEST_AREA = "largest_area" # Choose detection with largest bounding box area |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think NEAREST_TO CENTER and LARGEST_AREA are not as applicable, this may depend on where we move this worker around to but right now we have math implemented which makes it irrelevant where the centre of the detection is to the image, and also the area isn't as applicable because it could be a false detection (depends on the accuracy of our model)
| NEAREST_TO_CENTER = "nearest_to_center" # Choose detection closest to image center | ||
| LARGEST_AREA = "largest_area" # Choose detection with largest bounding box area | ||
| HIGHEST_CONFIDENCE = "highest_confidence" # Choose detection with highest confidence | ||
| FIRST_DETECTION = "first_detection" # Use first detection in list (original behavior) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iffy about first detection because it also runs on the assumption that the first detection is correct, but leaving for now
modules/auto_landing/auto_landing.py
Outdated
| if self.__selection_strategy == DetectionSelectionStrategy.NEAREST_TO_CENTER: | ||
| # Find detection closest to image center | ||
| image_center_x = self.im_w / 2 | ||
| image_center_y = self.im_h / 2 | ||
|
|
||
| min_distance = float("inf") | ||
| selected_index = 0 | ||
|
|
||
| for i, detection in enumerate(detections): | ||
| det_center_x, det_center_y = detection.get_centre() | ||
| distance = math.sqrt( | ||
| (det_center_x - image_center_x) ** 2 + (det_center_y - image_center_y) ** 2 | ||
| ) | ||
| if distance < min_distance: | ||
| min_distance = distance | ||
| selected_index = i | ||
|
|
||
| return selected_index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
won't need this
modules/auto_landing/auto_landing.py
Outdated
| if self.__selection_strategy == DetectionSelectionStrategy.LARGEST_AREA: | ||
| # Find detection with largest bounding box area | ||
| max_area = 0 | ||
| selected_index = 0 | ||
|
|
||
| for i, detection in enumerate(detections): | ||
| width = detection.x_2 - detection.x_1 | ||
| height = detection.y_2 - detection.y_1 | ||
| area = width * height | ||
| if area > max_area: | ||
| max_area = area | ||
| selected_index = i | ||
|
|
||
| return selected_index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or this
modules/auto_landing/auto_landing.py
Outdated
| if len(detections) == 1: | ||
| return 0 | ||
|
|
||
| if self.__selection_strategy == DetectionSelectionStrategy.FIRST_DETECTION: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could put in the same if statement cause same output
| time.sleep(period) | ||
|
|
||
|
|
||
| def _process_commands( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think what you're trying to implement is a queue that has a bunch of commands meaning the autolander decides whether or not it wants to autoland, and the fact that it can change from wanting to or not during execution hence calling the function in the worker (correct me if i'm wrong). i think the simplistic way to go is to assume that we will know whether or not we want to autoland, and that decision will not change throughout the execution of the worker once it's decided.
| """ | ||
|
|
||
| def __init__(self, command: str) -> None: | ||
| self.command = command |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what will make the most sense for making auto landing an enable/disable thing would be to include this as a constant in the config.yaml so that it's easily configurable
modules/auto_landing/auto_landing.py
Outdated
| self.__auto_landing = auto_landing | ||
| self.__logger = local_logger | ||
| self.__enabled = False | ||
| self.__enabled_lock = threading.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by getting rid of the queue that could possibly introduce the race conditions of enabling and disabling auto landing, we won't need the thread lock anymore cause this can only change once
| IMAGE_HEIGHT = 640.0 # pixels | ||
| IMAGE_WIDTH = 640.0 # pixels | ||
| WORKER_PERIOD = 0.1 # seconds | ||
| DETECTION_STRATEGY = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will need to change
| assert output_queue.queue.empty() | ||
|
|
||
| # Test 2: Enable auto-landing and verify it processes detections | ||
| enable_command = auto_landing_worker.AutoLandingCommand("enable") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will need this reconfigured for global constant in config.yaml
| detection3 = create_test_detection([100, 100, 200, 200], 1, 0.7) # Far from center | ||
| detection4 = create_test_detection([310, 310, 330, 330], 2, 0.8) # Close to center (320, 320) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can change this
| assert landing_info2 is not None | ||
|
|
||
| # Test 4: Disable auto-landing and verify it stops processing | ||
| disable_command = auto_landing_worker.AutoLandingCommand("disable") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above for enable/disable
| # Ensure logs directory exists and create timestamped subdirectory | ||
| LOG_DIR = pathlib.Path("logs") | ||
| LOG_DIR.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| # Create a timestamped subdirectory for this test session | ||
| TIMESTAMP_FORMAT = "%Y-%m-%d_%H-%M-%S" | ||
| test_session_dir = LOG_DIR / datetime.datetime.now().strftime(TIMESTAMP_FORMAT) | ||
| test_session_dir.mkdir(parents=True, exist_ok=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we wont need to keep logs for the tests, the terminal print statements should suffice
| if worker.is_alive(): | ||
| worker.terminate() | ||
| worker.join() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can just use the worker.terminate() instead
| assert hasattr(landing_info, "angle_x") | ||
| assert hasattr(landing_info, "angle_y") | ||
| assert hasattr(landing_info, "target_dist") | ||
| local_logger.info("--- Test 1 Passed ---") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of using the logger, you can use print statements so these can be seen in the terminal while running tests
|
reviewed! |
| input_data = input_queue.queue.get_nowait() | ||
| except queue.Empty: | ||
| # No data available, continue | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick but use continue
Completed the autonomous landing module. Tackled the following problems in the ticket:
Multiple detections? Which one do we choose to go to?
Implemented a flexible detection selection system with four different approaches:
• NEAREST_TO_CENTER (default): Chooses the detection closest to the image center
• LARGEST_AREA: Selects the detection with the biggest bounding box
• HIGHEST_CONFIDENCE: Picks the detection with the highest confidence score
• FIRST_DETECTION: Uses the first detection in the list
How/where does this get integrated in the worker pipeline?
The auto_landing_worker() function follows the worker pattern:
• Input: Receives merged_odometry_detections from an input queue
• Output: Sends landing_info to an output queue
• Commands: Receives control commands for state changes from a command queue
How do we turn it on/off? How do we signal this?
Uses AutoLandingCommand and AutoLandingController. The command wrapper carries two string commands ("enable"/"disable") and is sent through the command queue to be processed. The controller layer has enable() and disable() methods, and is_enabled() for status queries. The process_detections() only processes when enabled.
Let me know if anything needs changing.
Note: Some of the brightspot tests are failing so it doesn't pass the pytest check. I believe they were already failing beforehand.