Skip to content
This repository was archived by the owner on Nov 13, 2025. It is now read-only.

Conversation

@danielquzhao
Copy link
Member

Completed the autonomous landing module. Tackled the following problems in the ticket:

Multiple detections? Which one do we choose to go to?

Implemented a flexible detection selection system with four different approaches:
• NEAREST_TO_CENTER (default): Chooses the detection closest to the image center
• LARGEST_AREA: Selects the detection with the biggest bounding box
• HIGHEST_CONFIDENCE: Picks the detection with the highest confidence score
• FIRST_DETECTION: Uses the first detection in the list

How/where does this get integrated in the worker pipeline?

The auto_landing_worker() function follows the worker pattern:
• Input: Receives merged_odometry_detections from an input queue
• Output: Sends landing_info to an output queue
• Commands: Receives control commands for state changes from a command queue

How do we turn it on/off? How do we signal this?

Uses AutoLandingCommand and AutoLandingController. The command wrapper carries two string commands ("enable"/"disable") and is sent through the command queue to be processed. The controller layer has enable() and disable() methods, and is_enabled() for status queries. The process_detections() only processes when enabled.

Let me know if anything needs changing.

Note: Some of the brightspot tests are failing so it doesn't pass the pytest check. I believe they were already failing beforehand.

self.__logger = local_logger
self.__selection_strategy = selection_strategy

def _select_detection(self, detections: list) -> Optional[int]:
Copy link
Contributor

@ellyokes253 ellyokes253 Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you don't have to include the typing import and can just specify return type without it

Comment on lines 20 to 21
NEAREST_TO_CENTER = "nearest_to_center" # Choose detection closest to image center
LARGEST_AREA = "largest_area" # Choose detection with largest bounding box area
Copy link
Contributor

@ellyokes253 ellyokes253 Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think NEAREST_TO CENTER and LARGEST_AREA are not as applicable, this may depend on where we move this worker around to but right now we have math implemented which makes it irrelevant where the centre of the detection is to the image, and also the area isn't as applicable because it could be a false detection (depends on the accuracy of our model)

NEAREST_TO_CENTER = "nearest_to_center" # Choose detection closest to image center
LARGEST_AREA = "largest_area" # Choose detection with largest bounding box area
HIGHEST_CONFIDENCE = "highest_confidence" # Choose detection with highest confidence
FIRST_DETECTION = "first_detection" # Use first detection in list (original behavior)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iffy about first detection because it also runs on the assumption that the first detection is correct, but leaving for now

Comment on lines 114 to 131
if self.__selection_strategy == DetectionSelectionStrategy.NEAREST_TO_CENTER:
# Find detection closest to image center
image_center_x = self.im_w / 2
image_center_y = self.im_h / 2

min_distance = float("inf")
selected_index = 0

for i, detection in enumerate(detections):
det_center_x, det_center_y = detection.get_centre()
distance = math.sqrt(
(det_center_x - image_center_x) ** 2 + (det_center_y - image_center_y) ** 2
)
if distance < min_distance:
min_distance = distance
selected_index = i

return selected_index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't need this

Comment on lines 133 to 146
if self.__selection_strategy == DetectionSelectionStrategy.LARGEST_AREA:
# Find detection with largest bounding box area
max_area = 0
selected_index = 0

for i, detection in enumerate(detections):
width = detection.x_2 - detection.x_1
height = detection.y_2 - detection.y_1
area = width * height
if area > max_area:
max_area = area
selected_index = i

return selected_index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or this

Comment on lines 108 to 111
if len(detections) == 1:
return 0

if self.__selection_strategy == DetectionSelectionStrategy.FIRST_DETECTION:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could put in the same if statement cause same output

time.sleep(period)


def _process_commands(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think what you're trying to implement is a queue that has a bunch of commands meaning the autolander decides whether or not it wants to autoland, and the fact that it can change from wanting to or not during execution hence calling the function in the worker (correct me if i'm wrong). i think the simplistic way to go is to assume that we will know whether or not we want to autoland, and that decision will not change throughout the execution of the worker once it's decided.

"""

def __init__(self, command: str) -> None:
self.command = command
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what will make the most sense for making auto landing an enable/disable thing would be to include this as a constant in the config.yaml so that it's easily configurable

self.__auto_landing = auto_landing
self.__logger = local_logger
self.__enabled = False
self.__enabled_lock = threading.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by getting rid of the queue that could possibly introduce the race conditions of enabling and disabling auto landing, we won't need the thread lock anymore cause this can only change once

IMAGE_HEIGHT = 640.0 # pixels
IMAGE_WIDTH = 640.0 # pixels
WORKER_PERIOD = 0.1 # seconds
DETECTION_STRATEGY = auto_landing.DetectionSelectionStrategy.NEAREST_TO_CENTER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will need to change

assert output_queue.queue.empty()

# Test 2: Enable auto-landing and verify it processes detections
enable_command = auto_landing_worker.AutoLandingCommand("enable")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will need this reconfigured for global constant in config.yaml

Comment on lines 177 to 178
detection3 = create_test_detection([100, 100, 200, 200], 1, 0.7) # Far from center
detection4 = create_test_detection([310, 310, 330, 330], 2, 0.8) # Close to center (320, 320)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can change this

assert landing_info2 is not None

# Test 4: Disable auto-landing and verify it stops processing
disable_command = auto_landing_worker.AutoLandingCommand("disable")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above for enable/disable

@danielquzhao danielquzhao requested a review from ellyokes253 July 27, 2025 08:44
Comment on lines 33 to 40
# Ensure logs directory exists and create timestamped subdirectory
LOG_DIR = pathlib.Path("logs")
LOG_DIR.mkdir(parents=True, exist_ok=True)

# Create a timestamped subdirectory for this test session
TIMESTAMP_FORMAT = "%Y-%m-%d_%H-%M-%S"
test_session_dir = LOG_DIR / datetime.datetime.now().strftime(TIMESTAMP_FORMAT)
test_session_dir.mkdir(parents=True, exist_ok=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we wont need to keep logs for the tests, the terminal print statements should suffice

Comment on lines 256 to 258
if worker.is_alive():
worker.terminate()
worker.join()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can just use the worker.terminate() instead

assert hasattr(landing_info, "angle_x")
assert hasattr(landing_info, "angle_y")
assert hasattr(landing_info, "target_dist")
local_logger.info("--- Test 1 Passed ---")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of using the logger, you can use print statements so these can be seen in the terminal while running tests

@ellyokes253
Copy link
Contributor

reviewed!

input_data = input_queue.queue.get_nowait()
except queue.Empty:
# No data available, continue
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick but use continue

@ellyokes253 ellyokes253 merged commit 66b0654 into main Sep 8, 2025
1 check failed
@ellyokes253 ellyokes253 deleted the autonomous-landing-module branch September 8, 2025 03:48
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants