diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e76787d --- /dev/null +++ b/.gitignore @@ -0,0 +1,164 @@ +# Ignore vim .swp files +**.swp +**.swo + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/build/ \ No newline at end of file diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..9ad6380 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.8.18 diff --git a/config.ini b/config.ini index 840201f..b5d724f 100644 --- a/config.ini +++ b/config.ini @@ -1,7 +1,7 @@ [CONFIG] -SWIFT_IP=10.0.0.222 +SWIFT_IP=192.168.1.222 SWIFT_PORT=55556 -MBED_IP=10.0.0.101 +MBED_IP=192.168.1.101 MBED_PORT=1001 [ARTRACKER] #dpp is .040625 with logi diff --git a/examples/ar.py b/examples/ar.py old mode 100644 new mode 100755 index ac95cd1..44cca87 --- a/examples/ar.py +++ b/examples/ar.py @@ -1,12 +1,16 @@ from time import sleep +from libs import aruco_tracker +from loguru import logger -import os -from libs import ARTracker - -tracker = ARTracker.ARTracker(['/dev/video1'], write=False, useYOLO = False) #ARTracker requires a list of camera files +tracker = aruco_tracker.ARTracker([0], write=False) # ARTracker requires a list of camera IDs +marker = 1 # change this ARUCO identifier if you need to :) while True: - tracker.findMarker(1)#, id2 = 1) - print('Distance (in cm): ', tracker.distanceToMarker) - print('Angle: ', tracker.angleToMarker) - sleep(.5) + found = tracker.find_marker(marker) + + if found: + print('Distance (in cm): ', tracker.distance_to_marker) + print('Angle: ', tracker.angle_to_marker) + else: + logger.error(f"Didn't find marker {marker}") + sleep(0.5) diff --git a/examples/location.py b/examples/location.py old mode 100644 new mode 100755 index 2fdc518..4df1cda --- a/examples/location.py +++ b/examples/location.py @@ -1,16 +1,16 @@ from time import sleep -from libs import Location +from libs.location import Location -#os.chdir(os.path.dirname(os.path.abspath(__file__))) -if __name__ == '__main__': - l = Location.Location('10.0.0.222','55556') - print('starting gps') - l.start_GPS() - l.start_GPS_thread() - print('reading data') +# os.chdir(os.path.dirname(os.path.abspath(__file__))) +if __name__ == "__main__": + locator = Location("192.168.1.222", "55556") + print("starting gps") + locator.start_GPS() + locator.start_GPS_thread() + print("reading data") while True: - print(l.latitude) - print(l.longitude) - print(l.bearing) + print("latitude:", locator.latitude) + print("longitude:", locator.longitude) + print("bearing:", locator.bearing) print() sleep(1) diff --git a/examples/map.py b/examples/map.py old mode 100644 new mode 100755 index 9c4bc49..04bf4e5 --- a/examples/map.py +++ b/examples/map.py @@ -1,22 +1,13 @@ -from nis import maps -import os -from random import randint, random - -from scipy import rand -os.chdir(os.path.dirname(os.path.abspath(__file__))) - import threading - import sys -sys.path.append('../../Mission Control/RoverMap/') +sys.path.append("../../Mission Control/RoverMap/") from server import MapServer -from libs import Location - -if __name__ == '__main__': +from libs import location - loc = Location.Location('10.0.0.222', '55556') - print('Starting GPS') +if __name__ == "__main__": + loc = location.Location("10.0.0.222", "55556") + print("Starting GPS") loc.start_GPS() loc.start_GPS_thread() @@ -28,14 +19,14 @@ def set_interval(func, sec): def func_wrapper(): set_interval(func, sec) func() + t = threading.Timer(sec, func_wrapper) t.start() return t def update(): print("sending update...") - #mapServer.update_rover_coords([38.4375 + randint(0, 100) / 10000 , -110.8125]) + # mapServer.update_rover_coords([38.4375 + randint(0, 100) / 10000 , -110.8125]) mapServer.update_rover_coords([loc.latitude, loc.longitude]) - - set_interval(update, 0.500) + set_interval(update, 0.500) diff --git a/examples/wheels.py b/examples/wheels.py old mode 100644 new mode 100755 index 711c303..b293495 --- a/examples/wheels.py +++ b/examples/wheels.py @@ -1,20 +1,22 @@ -#If someone knows a better way to write the next 5 lines, lmk -import os -os.chdir(os.path.dirname(os.path.abspath(__file__))) +from time import sleep +from libs import udp_out -import sys -sys.path.append('../') - -from libs import UDPOut - -HOST = '127.0.0.1' +HOST = "192.168.1.101" # 127.0.0.1 is the 'loopback' address, or the address # of your own computer -PORT = 2000 +PORT = 1001 # this number is arbitrary as long as it is above 1024 +speed = 90 -UDPOut.sendWheelSpeeds(HOST, PORT, 255, 255, 255, 255, 255, 255) +udp_out.send_wheel_speeds(HOST, PORT, speed, speed, speed, speed, speed, speed) # the six arguments represent each wheels speed (on a scale between 0 and 255, since it will be stored in a byte). -# the order of the wheels in the arguments is Front Left, Middle Left, Rear Left, Front Right, Middle Right, and Rear Right. +# the order of the wheels in the arguments is Front Left, Middle Left, Rear Left, Front Right, Middle Right, and Rear Right. # sendWheelSpeeds(HOST, PORT, fl, ml, rl, rt, mr, rr) + +if __name__ == "__main__": + print("Sup") + speed = 90 + while True: + udp_out.send_wheel_speeds(HOST, PORT, speed, speed, speed, speed, speed, speed) + sleep(1) diff --git a/findFocalLength.py b/findFocalLength.py index d9454e5..85196f7 100755 --- a/findFocalLength.py +++ b/findFocalLength.py @@ -1,65 +1,68 @@ #!/usr/bin/python3 -#This programs returns the camera focal length assuming tag is tag 0, 20cm wide and 100cm away +# This programs returns the camera focal length assuming tag is tag 0, 20cm wide and 100cm away import cv2 import cv2.aruco as aruco -import numpy as np import os import configparser from time import sleep -#opens the camera -cam = cv2.VideoCapture('/dev/video0') +# opens the camera +cam = cv2.VideoCapture("/dev/video0") if not cam.isOpened(): print("Camera not connected") exit(-1) -#sets up the config stuff +# sets up the config stuff config = configparser.ConfigParser(allow_no_value=True) -config.read(os.path.dirname(__file__) + '/config.ini') +config.read(os.path.dirname(__file__) + "/config.ini") -#sets the resolution and 4cc -format = config['ARTRACKER']['FORMAT'] -cam.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(format[0], format[1], format[2], format[3])) +# sets the resolution and 4cc +format = config["ARTRACKER"]["FORMAT"] +cam.set( + cv2.CAP_PROP_FOURCC, + cv2.VideoWriter.fourcc(format[0], format[1], format[2], format[3]), +) -dppH = float(config['ARTRACKER']['DEGREES_PER_PIXEL']) -dppV = float(config['ARTRACKER']['VDEGREES_PER_PIXEL']) -width = int(config['ARTRACKER']['FRAME_WIDTH']) -height = int(config['ARTRACKER']['FRAME_HEIGHT']) +dppH = float(config["ARTRACKER"]["DEGREES_PER_PIXEL"]) +dppV = float(config["ARTRACKER"]["VDEGREES_PER_PIXEL"]) +width = int(config["ARTRACKER"]["FRAME_WIDTH"]) +height = int(config["ARTRACKER"]["FRAME_HEIGHT"]) cam.set(cv2.CAP_PROP_FRAME_WIDTH, width) cam.set(cv2.CAP_PROP_FRAME_HEIGHT, height) -tagDict = aruco.Dictionary_get(aruco.DICT_4X4_50) +tagDict = aruco.getPredefinedDictionary(aruco.DICT_4X4_50) while True: try: - #takes image and detects markers + # takes image and detects markers ret, image = cam.read() - corners, markerIDs, rejected = aruco.detectMarkers(image, tagDict) - + detector = aruco.ArucoDetector(tagDict) + (corners, markerIDs, rejected) = detector.detectMarkers(image) + tagWidth = 0 - if not markerIDs is None and markerIDs[0] == 4: + if markerIDs is not None and markerIDs[0] == 4: tagWidth = corners[0][0][1][0] - corners[0][0][0][0] - - #IMPORTANT: Assumes tag is 20cm wide and 100cm away + + # IMPORTANT: Assumes tag is 20cm wide and 100cm away focalLength = (tagWidth * 100) / 20 print("Focal length: ", focalLength) - centerXMarker = (corners[0][0][1][0] + corners[0][0][0][0]) / 2 - angleToMarkerH = dppH * (centerXMarker - width/2) - print('Horizontal angle: ', angleToMarkerH) - - centerYMarker = (corners[0][0][0][1] + corners[0][0][2][1]) / 2 - angleToMarkerV = -1 *dppV * (centerYMarker - height/2) - print('Vertical angle: ', angleToMarkerV) + centerXMarker = (corners[0][0][1][0] + corners[0][0][0][0]) / 2 + angleToMarkerH = dppH * (centerXMarker - width / 2) + print("Horizontal angle: ", angleToMarkerH) + + centerYMarker = (corners[0][0][0][1] + corners[0][0][2][1]) / 2 + angleToMarkerV = -1 * dppV * (centerYMarker - height / 2) + print("Vertical angle: ", angleToMarkerV) else: print("Nothing found") - - sleep(.1) - + + sleep(0.1) + except KeyboardInterrupt: break - except: - pass #When running multiple times in a row, must cycle through a few times - -cam.release() #releases the camera + except Exception: + pass # When running multiple times in a row, must cycle through a few times + +cam.release() # releases the camera diff --git a/libs/ARTracker.py b/libs/ARTracker.py deleted file mode 100644 index 8ee90ae..0000000 --- a/libs/ARTracker.py +++ /dev/null @@ -1,286 +0,0 @@ -import cv2 -import cv2.aruco as aruco -import numpy as np -import configparser -import sys -from time import sleep -import os -''' -darknetPath = os.path.dirname(os.path.abspath(__file__)) + '/../YOLO/darknet/' -sys.path.append(darknetPath) -from darknet_images import * -from darknet import load_network -''' - -class ARTracker: - - # Constructor - #Cameras should be a list of file paths to cameras that are to be used - #set write to True to write to disk what the cameras are seeing - #set useYOLO to True to use yolo when attempting to detect the ar tags - def __init__(self, cameras, write=False, useYOLO = False, configFile="config.ini"): - self.write=write - self.distanceToMarker = -1 - self.angleToMarker = -999.9 - self.index1 = -1 - self.index2 = -1 - self.useYOLO = useYOLO - self.cameras = cameras - - # Open the config file - config = configparser.ConfigParser(allow_no_value=True) - if not config.read(configFile): - print(f"ERROR OPENING AR CONFIG:", end="") - if os.path.isabs(configFile): - print(configFile) - else: - print("{os.getcwd()}/{configFile}") - exit(-2) - - # Set variables from the config file - self.degreesPerPixel = float(config['ARTRACKER']['DEGREES_PER_PIXEL']) - self.vDegreesPerPixel = float(config['ARTRACKER']['VDEGREES_PER_PIXEL']) - self.focalLength = float(config['ARTRACKER']['FOCAL_LENGTH']) - self.focalLength30H = float(config['ARTRACKER']['FOCAL_LENGTH30H']) - self.focalLength30V = float(config['ARTRACKER']['FOCAL_LENGTH30V']) - self.knownMarkerWidth = float(config['ARTRACKER']['KNOWN_TAG_WIDTH']) - self.format = config['ARTRACKER']['FORMAT'] - self.frameWidth = int(config['ARTRACKER']['FRAME_WIDTH']) - self.frameHeight = int(config['ARTRACKER']['FRAME_HEIGHT']) - - #sets up yolo - if useYOLO: - os.chdir(darknetPath) - weights = config['YOLO']['WEIGHTS'] - cfg = config['YOLO']['CFG'] - data = config['YOLO']['DATA'] - self.thresh = float(config['YOLO']['THRESHOLD']) - self.network, self.class_names, self.class_colors = load_network(cfg, data, weights, 1) - os.chdir(os.path.dirname(os.path.abspath(__file__))) - - self.networkWidth = darknet.network_width(self.network) - self.networkHeight = darknet.network_height(self.network) - - # Initialize video writer, fps is set to 5 - if self.write: - self.videoWriter = cv2.VideoWriter("autonomous.avi", cv2.VideoWriter_fourcc( - self.format[0], self.format[1], self.format[2], self.format[3]), 5, (self.frameWidth, self.frameHeight), False) - - # Set the ar marker dictionary - self.markerDict = aruco.Dictionary_get(aruco.DICT_4X4_50) - - # Initialize cameras - self.caps=[] - if isinstance(self.cameras, int): - self.cameras = [self.cameras] - for i in range(0, len(self.cameras)): - #Makes sure the camera actually connects - while True: - cam = cv2.VideoCapture(self.cameras[i]) - if not cam.isOpened(): - print(f"!!!!!!!!!!!!!!!!!!!!!!!!!!Camera ", i, " did not open!!!!!!!!!!!!!!!!!!!!!!!!!!") - cam.release() - continue - cam.set(cv2.CAP_PROP_FRAME_HEIGHT, self.frameHeight) - cam.set(cv2.CAP_PROP_FRAME_WIDTH, self.frameWidth) - cam.set(cv2.CAP_PROP_BUFFERSIZE, 1) # greatly speeds up the program but the writer is a bit wack because of this - cam.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(self.format[0], self.format[1], self.format[2], self.format[3])) - #ret, testIm = self.caps[i].read()[0]: - if not cam.read()[0]: - cam.release() - else: - self.caps.append(cam) - break - - - #helper method to convert YOLO detections into the aruco corners format - def _convertToCorners(self,detections, numCorners): - corners = [] - xCoef = self.frameWidth / self.networkWidth - yCoef = self.frameHeight / self.networkHeight - if len(detections) < numCorners: - print('ERROR, convertToCorners not used correctly') - raise ValueError - for i in range(0, numCorners): - tagData = list(detections[i][2]) #Gets the x, y, width, height - - #YOLO resizes the image so this sizes it back to what we're exepcting - tagData[0] *= xCoef - tagData[1]*= yCoef - tagData[2] *= xCoef - tagData[3] *= yCoef - - #Gets the corners - topLeft = [tagData[0] - tagData[2]/2, tagData[1] - tagData[3]/2] - topRight = [tagData[0] + tagData[2]/2, tagData[1] - tagData[3]/2] - bottomRight = [tagData[0] + tagData[2]/2, tagData[1] + tagData[3]/2] - bottomLeft = [tagData[0] - tagData[2]/2, tagData[1] + tagData[3]/2] - - #appends the corners with the same format as aruco - corners.append([[topLeft, topRight, bottomRight, bottomLeft]]) - - return corners - - #id1 is the main ar tag to track, id2 is if you're looking at a gatepost, image is the image to analyze - def markerFound(self, id1, image, id2=-1): - # converts to grayscale - cv2.cvtColor(image, cv2.COLOR_RGB2GRAY, image) - - self.index1 = -1 - self.index2 = -1 - bw = image #will hold the black and white image - # tries converting to b&w using different different cutoffs to find the perfect one for the current lighting - for i in range(40, 221, 60): - bw = cv2.threshold(image,i,255, cv2.THRESH_BINARY)[1] - (self.corners, self.markerIDs, self.rejected) = aruco.detectMarkers(bw, self.markerDict) - if not (self.markerIDs is None): - print('', end='') #I have not been able to reproduce an error when I have a print statement here so I'm leaving it in - if id2==-1: #single post - self.index1 = -1 - # this just checks to make sure that it found the right marker - for m in range(len(self.markerIDs)): - if self.markerIDs[m] == id1: - self.index1 = m - break - - if self.index1 != -1: - print("Found the correct marker!") - if self.write: - self.videoWriter.write(bw) #purely for debug - cv2.waitKey(1) - break - - else: - print("Found a marker but was not the correct one") - - else: #gate post - self.index1 = -1 - self.index2 = -1 - if len(self.markerIDs) == 1: - print('Only found marker ', self.markerIDs[0]) - else: - for j in range(len(self.markerIDs) - 1, -1,-1): #I trust the biggest markers the most - if self.markerIDs[j][0] == id1: - self.index1 = j - elif self.markerIDs[j][0] == id2: - self.index2 = j - if self.index1 != -1 and self.index2 != -1: - print('Found both markers!') - if self.write: - self.videoWriter.write(bw) #purely for debug - cv2.waitKey(1) - break - - if i == 220: #did not find any AR markers with any b&w cutoff using aruco - #Checks to see if yolo can find a tag - if self.useYOLO: - detections = [] - if not self.write: - #this is a simpler detection function that doesn't return the image - detections = simple_detection(image, self.network, self.class_names, self.thresh) - else: - #more complex detection that returns the image to be written - image, detections = complex_detection(image, self.network, self.class_names, self.class_colors, self.thresh) - #cv2.imwrite('ar.jpg', image) - for d in detections: - print(d) - - if id2 == -1 and len(detections) > 0: - self.corners = self._convertToCorners(detections, 1) - self.index1 = 0 #Takes the highest confidence ar tag - if self.write: - self.videoWriter.write(image) #purely for debug - cv2.waitKey(1) - elif len(detections) > 1: - self.corners = self._convertToCorners(detections, 2) - self.index1 = 0 #takes the two highest confidence ar tags - self.index2 = 1 - if self.write: - self.videoWriter.write(image) #purely for debug - cv2.waitKey(1) - print(self.corners) - - #Not even YOLO saw anything - if self.index1 == -1 or (self.index2 == -1 and id2 != -1): - if self.write: - self.videoWriter.write(image) - #cv2.imshow('window', image) - cv2.waitKey(1) - self.distanceToMarker = -1 - self.angleToMarker = -999 - return False - - if id2 == -1: - centerXMarker = (self.corners[self.index1][0][0][0] + self.corners[self.index1][0][1][0] + \ - self.corners[self.index1][0][2][0] + self.corners[self.index1][0][3][0]) / 4 - # takes the pixels from the marker to the center of the image and multiplies it by the degrees per pixel - self.angleToMarker = self.degreesPerPixel * (centerXMarker - self.frameWidth/2) - - ''' - distanceToAR = (knownWidthOfMarker(20cm) * focalLengthOfCamera) / pixelWidthOfMarker - focalLength = focal length at 0 degrees horizontal and 0 degrees vertical - focalLength30H = focal length at 30 degreees horizontal and 0 degrees vertical - focalLength30V = focal length at 30 degrees vertical and 0 degrees horizontal - realFocalLength of camera = focalLength - + (horizontal angle to marker/30) * (focalLength30H - focalLength) - + (vertical angle to marker / 30) * (focalLength30V - focalLength) - If focalLength30H and focalLength30V both equal focalLength then realFocalLength = focalLength which is good for non huddly cameras - Please note that the realFocalLength calculation is an approximation that could be much better if anyone wants to try to come up with something better - ''' - hAngleToMarker = abs(self.angleToMarker) - centerYMarker = (self.corners[self.index1][0][0][1] + self.corners[self.index1][0][1][1] + \ - self.corners[self.index1][0][2][1] + self.corners[self.index1][0][3][1]) / 4 - vAngleToMarker = abs(self.vDegreesPerPixel * (centerYMarker - self.frameHeight/2)) - realFocalLength = self.focalLength + (hAngleToMarker/30) * (self.focalLength30H - self.focalLength) + \ - (vAngleToMarker/30) * (self.focalLength30V - self.focalLength) - widthOfMarker = ((self.corners[self.index1][0][1][0] - self.corners[self.index1][0][0][0]) + \ - (self.corners[self.index1][0][2][0] - self.corners[self.index1][0][3][0])) / 2 - self.distanceToMarker = (self.knownMarkerWidth * realFocalLength) / widthOfMarker - - else: - centerXMarker1 = (self.corners[self.index1][0][0][0] + self.corners[self.index1][0][1][0] + \ - self.corners[self.index1][0][2][0] + self.corners[self.index1][0][3][0]) / 4 - centerXMarker2 = (self.corners[self.index2][0][0][0] + self.corners[self.index2][0][1][0] + \ - self.corners[self.index2][0][2][0] + self.corners[self.index2][0][3][0]) / 4 - self.angleToMarker = self.degreesPerPixel * ((centerXMarker1 + centerXMarker2)/2 - self.frameWidth/2) - - hAngleToMarker1 = abs(self.vDegreesPerPixel * (centerXMarker1 - self.frameWidth/2)) - hAngleToMarker2 = abs(self.vDegreesPerPixel * (centerXMarker2 - self.frameWidth/2)) - centerYMarker1 = (self.corners[self.index1][0][0][1] + self.corners[self.index1][0][1][1] + \ - self.corners[self.index1][0][2][1] + self.corners[self.index1][0][3][1]) / 4 - centerYMarker2 = (self.corners[self.index2][0][0][1] + self.corners[self.index2][0][1][1] + \ - self.corners[self.index2][0][2][1] + self.corners[self.index2][0][3][1]) / 4 - vAngleToMarker1 = abs(self.vDegreesPerPixel * (centerYMarker1 - self.frameHeight/2)) - vAngleToMarker2 = abs(self.vDegreesPerPixel * (centerYMarker2 - self.frameHeight/2)) - realFocalLength1 = self.focalLength + (hAngleToMarker1/30) * (self.focalLength30H - self.focalLength) + \ - (vAngleToMarker1/30) * (self.focalLength30V - self.focalLength) - realFocalLength2 = self.focalLength + (hAngleToMarker2/30) * (self.focalLength30H - self.focalLength) + \ - (vAngleToMarker2/30) * (self.focalLength30V - self.focalLength) - widthOfMarker1 = ((self.corners[self.index1][0][1][0] - self.corners[self.index1][0][0][0]) + \ - (self.corners[self.index1][0][2][0] - self.corners[self.index1][0][3][0])) / 2 - widthOfMarker2 = ((self.corners[self.index2][0][1][0] - self.corners[self.index2][0][0][0]) + \ - (self.corners[self.index1][0][2][0] - self.corners[self.index1][0][3][0])) / 2 - - #distanceToAR = (knownWidthOfMarker(20cm) * focalLengthOfCamera) / pixelWidthOfMarker - distanceToMarker1 = (self.knownMarkerWidth * realFocalLength1) / widthOfMarker1 - distanceToMarker2 = (self.knownMarkerWidth * realFocalLength2) / widthOfMarker2 - print(f"1: {distanceToMarker1}, 2: {distanceToMarker2}") - self.distanceToMarker = (distanceToMarker1 + distanceToMarker2) / 2 - - return True - - ''' - id1 is the marker you want to look for - specify id2 if you want to look for a gate - cameras=number of cameras to check. -1 for all of them - ''' - def findMarker(self, id1, id2=-1, cameras=-1): - if cameras == -1: - cameras=len(self.caps) - - for i in range(cameras): - ret, frame = self.caps[i].read() - if self.markerFound(id1, frame, id2=id2): - return True - - return False diff --git a/libs/Drive.py b/libs/Drive.py deleted file mode 100644 index 13334ca..0000000 --- a/libs/Drive.py +++ /dev/null @@ -1,285 +0,0 @@ -from threading import Thread -from threading import Timer -import configparser -import os -os.chdir(os.path.dirname(os.path.abspath(__file__))) -import math -from time import sleep -from nis import maps - -import sys -sys.path.append('../../Mission Control/RoverMap/') -from server import MapServer - -from libs import UDPOut -from libs import Location -from libs import ARTracker - -class Drive: - - def __init__(self, baseSpeed, cameras): - self.baseSpeed = baseSpeed - self.tracker = ARTracker.ARTracker(cameras) - - #Starts everything needed by the map - self.mapServer = MapServer() - self.mapServer.register_routes() - self.mapServer.start(debug=False) - self.startMap(self.updateMap, .5) - sleep(.1) - - #sets up the parser - config = configparser.ConfigParser(allow_no_value=True) - config.read(os.path.dirname(__file__) + '/../config.ini') - - #parses config - self.mbedIP = str(config['CONFIG']['MBED_IP']) - self.mbedPort = int(config['CONFIG']['MBED_PORT']) - swiftIP = str(config['CONFIG']['SWIFT_IP']) - swiftPort = str(config['CONFIG']['SWIFT_PORT']) - self.gps = Location.Location(swiftIP, swiftPort) - self.gps.start_GPS() - - self.speeds = [0,0] - self.errorAccumulation = 0.0 - - #starts the thread that sends wheel speeds - self.running = True - t = Thread(target=self.sendSpeed, name=('send wheel speeds'), args=()) - t.daemon = True - t.start() - - - def startMap(self, func, seconds): - def func_wrapper(): - self.startMap(func, seconds) - func() - t = Timer(seconds, func_wrapper) - t.start() - return t - - def updateMap(self): - self.mapServer.update_rover_coords([self.gps.latitude, self.gps.longitude]) - - - #Every 100ms, send the current left and right wheel speeds to the mbeds - def sendSpeed(self): - while self.running: - ls = int(self.speeds[0]) - rs = int(self.speeds[1]) - UDPOut.sendWheelSpeeds(self.mbedIP, self.mbedPort, ls,ls,ls, rs,rs,rs) - sleep(.1) - - #time in milliseconds - #error in degrees - #Gets adjusted speeds based off of error and how long its been off (uses p and i) - def getSpeeds(self,speed, error, time, kp = .35, ki=.000035): - values = [0,0] - - #p and i constants if doing a pivot turn - if speed == 0: - kp = .9 - ki = .001 - - #Updates the error accumulation - self.errorAccumulation += error * time - - #Gets the adjusted speed values - values[0] = speed + (error * kp + self.errorAccumulation * ki) - values[1] = speed - (error * kp + self.errorAccumulation * ki) - - #Gets the maximum speed values depending if it is pivoting or not - min = speed - 30 - max = speed + 30 - if speed == 0: - max += 40 - min -= 40 - if max > 90: - max=90 - if min < -90: - min = -90 - - #Makes sure the adjusted speed values are within the max and mins - if values[0] > max: - values[0] = max - elif values[0] < min: - values[0] = min - if values[1] > max: - values[1] = max - elif values[1] < min: - values[1] = min - - #Makes sure the speeds are >10 or <-10. Wheels lock up if the speeds are <10 and >-10 - if values[0] <= 0 and values[0] > -10: - values[0] = -10 - elif values[0] > 0 and values[0] < 10: - values[0] = 10 - - if values[1] <= 0 and values[1] > -10: - values[1] = -10 - elif values[1] > 0 and values[1] < 10: - values[1] = 10 - - if values[0] <= 0 and values[0] > -40 and speed == 0: - values[0] = -40 - elif values[0] > 0 and values[0] < 40 and speed == 0: - values[0] = 40 - - if values[1] <= 0 and values[1] > -40 and speed == 0: - values[1] = -40 - elif values[1] > 0 and values[1] < 40 and speed == 0: - values[1] = 40 - - return values - - #Cleaner way to print out the wheel speeds - def printSpeeds(self): - print("Left wheels: ", round(self.speeds[0],1)) - print("Right wheels: ", round(self.speeds[1],1)) - - #Drives along a given list of GPS coordinates while looking for the given ar markers - #Keep id2 at -1 if looking for one post, set id1 to -1 if you aren't looking for AR markers - def driveAlongCoordinates(self, locations, id1, id2=-1): - #Starts the GPS - self.gps.start_GPS_thread() - print('Waiting for GPS connection...') - #while self.gps.all_zero: - # continue - print('Connected to GPS') - - #backs up and turns to avoid running into the last detected sign. Also allows it to get a lock on heading - if(id1 > -1): - self.speeds = [-60,-60] - self.printSpeeds() - sleep(2) - self.speeds = [0,0] - self.printSpeeds() - sleep(2) - self.speeds = [80,20] - self.printSpeeds() - sleep(4) - else: - self.speeds = (self.baseSpeed, self.baseSpeed) - self.printSpeeds() - sleep(3) - - #navigates to each location - for l in locations: - self.errorAccumulation = 0 - while self.gps.distance_to(l[0], l[1]) > .0025: #.0025km - bearingTo = self.gps.bearing_to(l[0], l[1]) - print(self.gps.distance_to(l[0], l[1]) ) - self.speeds = self.getSpeeds(self.baseSpeed, bearingTo, 100) #It will sleep for 100ms - sleep(.1) #Sleeps for 100ms - self.printSpeeds() - - if(id1 != -1 and self.tracker.findMarker(id1, id2)): - self.gps.stop_GPS_thread() - print('Found Marker!') - self.speeds = [0,0] - return True - - self.gps.stop_GPS_thread() - print('Made it to location without seeing marker(s)') - self.speeds = [0,0] - return False - - def trackARMarker(self, id1, id2=-1): - stopDistance = 350 #stops when 250cm from markers TODO make sure rover doesn't stop too far away with huddlys - timesNotFound = -1 - self.tracker.findMarker(id1, id2, cameras=1) #Gets and initial angle from the main camera - self.errorAccumulation = 0 - - count = 0 - #Centers the middle camera with the tag - while self.tracker.angleToMarker > 14 or self.tracker.angleToMarker < -14: - if self.tracker.findMarker(id1, id2, cameras=1): #Only looking with the center camera right now - if timesNotFound == -1: - self.speeds = [0,0] - sleep(.5) - self.speeds = [self.baseSpeed, self.baseSpeed] - sleep(.8) - self.speeds = [0,0] - else: - self.speeds = self.getSpeeds(0, self.tracker.angleToMarker, 100) - print(self.tracker.angleToMarker, " ", self.tracker.distanceToMarker) - timesNotFound = 0 - elif timesNotFound == -1: #Never seen the tag with the main camera - if(math.ceil(int(count/20)/5) % 2 == 1): - self.speeds = [self.baseSpeed+5,-self.baseSpeed-5] - else: - self.speeds = [-self.baseSpeed-5,self.baseSpeed+5] - elif timesNotFound < 15: #Lost the tag for less than 1.5 seconds after seeing it with the main camera - timesNotFound += 1 - print(f"lost tag {timesNotFound} times") - else: - self.speeds = [0,0] - print("lost it") #TODO this is bad - timesNotFound = -1 - #return False - self.printSpeeds() - sleep(.1) - count+=1 - self.speeds = [0,0] - sleep(.5) - - if id2 == -1: - self.errorAccumulation = 0 - print("Locked on and ready to track") - - #Tracks down the tag - while self.tracker.distanceToMarker > stopDistance or self.tracker.distanceToMarker == -1: #-1 means we lost the tag - markerFound = self.tracker.findMarker(id1, cameras = 1) #Looks for the tag - - if self.tracker.distanceToMarker > stopDistance: - self.speeds = self.getSpeeds(self.baseSpeed-8, self.tracker.angleToMarker, 100, kp = .5, ki = .0001) - timesNotFound = 0 - print(f"Tag is {self.tracker.distanceToMarker}cm away at {self.tracker.angleToMarker} degrees") - - elif self.tracker.distanceToMarker == -1 and timesNotFound < 10: - timesNotFound += 1 - print(f"lost tag {timesNotFound} times") - - elif self.tracker.distanceToMarker == -1: - self.speeds = [0,0] - print("Lost tag") - return False #TODO this is bad - - self.printSpeeds() - sleep(.1) - - #We scored! - self.speeds = [0,0] - print("In range of the tag!") - return True - else: - #Gets the coords to the point that is 4m infront of the gate posts (get_coordinates expects distance in km) - coords = self.gps.get_coordinates(self.tracker.distanceToMarker/100000.0+.004, self.tracker.angleToMarker) - - self.speeds = [self.baseSpeed, self.baseSpeed] - sleep(5) - - #TODO: test this more after getting the new GPS - ''' - #Rover hasn't been moving for a bit so moves to get correct bearing - self.speeds = [-self.baseSpeed, -self.baseSpeed] - self.printSpeeds() - sleep(2) - self.speeds = [0,0] - self.printSpeeds() - sleep(1) - self.speeds = [self.baseSpeed, self.baseSpeed] - self.printSpeeds() - sleep(2) - - #Drives to the calculated location - while self.gps.distance_to(coords[0], coords[1]) > .003: #TODO: might want to adjust distance here... - bearingTo = self.gps.bearing_to(coords[0], coords[1]) - print(self.gps.distance_to(coords[0], coords[1]) ) - self.speeds = self.getSpeeds(self.baseSpeed, bearingTo, 100) #It will sleep for 100ms - sleep(.1) #Sleeps for 100ms - self.printSpeeds() - ''' - - - diff --git a/libs/Locationf9p.py b/libs/Locationf9p.py old mode 100644 new mode 100755 index 5222f47..a32ba25 --- a/libs/Locationf9p.py +++ b/libs/Locationf9p.py @@ -1,11 +1,13 @@ import sys -sys.path.append('../') + +sys.path.append("../") from math import cos, radians, degrees, sin, atan2, pi, sqrt, asin import threading + # Class that computes functions related to location of Rover # TODO: Make sure that the current GPS outputs coordinates -# in the same format as the swift, and test out +# in the same format as the swift, and test out # heading and see if that should be computed # somewhere outside of __parse class LocationF9P: @@ -28,30 +30,41 @@ def config(self): pass # Returns distance in kilometers between given latitude and longitude - def distance_to(self, lat:float, lon:float): + def distance_to(self, lat: float, lon: float): earth_radius = 6371.301 - delta_lat = (lat - self.latitude) * (pi/180.0) - delta_lon = (lon - self.longitude) * (pi/180.0) + delta_lat = (lat - self.latitude) * (pi / 180.0) + delta_lon = (lon - self.longitude) * (pi / 180.0) - a = sin(delta_lat/2) * sin(delta_lat/2) + cos(self.latitude * (pi/180.0)) * cos(lat * (pi/180.0)) * sin(delta_lon/2) * sin(delta_lon/2) - c = 2 * atan2(sqrt(a), sqrt(1-a)) + a = sin(delta_lat / 2) * sin(delta_lat / 2) + cos( + self.latitude * (pi / 180.0) + ) * cos(lat * (pi / 180.0)) * sin(delta_lon / 2) * sin(delta_lon / 2) + c = 2 * atan2(sqrt(a), sqrt(1 - a)) return earth_radius * c # Calculates difference between given bearing to location and current bearing # Positive is turn right, negative is turn left - def bearing_to(self, lat:float, lon:float): - resultbearing = self.calc_bearing(self.latitude, self.longitude, lat, lon) - self.bearing - return resultbearing + 360 if resultbearing < -180 else (resultbearing - 360 if resultbearing > 180 else resultbearing) + def bearing_to(self, lat: float, lon: float): + result_bearing = ( + self.calc_bearing(self.latitude, self.longitude, lat, lon) - self.bearing + ) + return ( + result_bearing + 360 + if result_bearing < -180 + else (result_bearing - 360 if result_bearing > 180 else result_bearing) + ) # Starts updating fields from the GPS box def start_GPS_thread(self): - if self.device_open_file == None: - print("[Location-f9p.py] start_GPS_thread was called before opening the gps /dev/ entry - start_GPS will be called now") + if self.device_open_file is None: + print( + "[Location-f9p.py] start_GPS_thread was called before opening the gps /dev/ entry - start_GPS will be called now" + ) self.start_GPS() self.running = True - t = threading.Thread(target=self.update_fields_loop, name=( - 'update GPS fields'), args=()) + t = threading.Thread( + target=self.update_fields_loop, name=("update GPS fields"), args=() + ) t.daemon = True t.start() @@ -63,9 +76,10 @@ def start_GPS(self): self.device_open_file = open(self.device_path) def update_fields_loop(self): - while(self.running): - line_read = self.device_open_file.readline() - self.__parse(line_read) + while self.running: + if self.device_open_file is not None: + line_read = self.device_open_file.readline() + self.__parse(line_read) return # Parse NMEA messages read from the GPS. @@ -120,7 +134,9 @@ def __parse(self, message_str): -1 if split[4] == "W" else 1 ) - self.bearing = self.calc_bearing(self.old_latitude, self.old_longitude, self.latitude, self.longitude) + self.bearing = self.calc_bearing( + self.old_latitude, self.old_longitude, self.latitude, self.longitude + ) elif not ( message_str == "\n" or message_str == "\r\n" or message_str == "" ) and not message_str.startswith("$"): @@ -128,26 +144,28 @@ def __parse(self, message_str): "got weird message: " + message_str + " of len " + str(len(split)) ) - # Calculates bearing between two points. + # Calculates bearing between two points. # (0 is North, 90 is East, +/-180 is South, -90 is West) - def calc_bearing(self,lat1:float, lon1:float, lat2:float, lon2:float): - x = cos(lat2 * (pi/180.0)) * sin((lon2-lon1) * (pi/180.0)) - y = cos(lat1 * (pi/180.0)) * sin(lat2 * (pi/180.0)) - sin(lat1 * (pi/180.0)) * cos(lat2 * (pi/180.0)) * cos((lon2-lon1) * (pi/180.0)) - return (180.0/pi) * atan2(x,y) + def calc_bearing(self, lat1: float, lon1: float, lat2: float, lon2: float): + x = cos(lat2 * (pi / 180.0)) * sin((lon2 - lon1) * (pi / 180.0)) + y = cos(lat1 * (pi / 180.0)) * sin(lat2 * (pi / 180.0)) - sin( + lat1 * (pi / 180.0) + ) * cos(lat2 * (pi / 180.0)) * cos((lon2 - lon1) * (pi / 180.0)) + return (180.0 / pi) * atan2(x, y) # Calculate latitutde and longitude given distance (in km) and bearing (in degrees) - def get_coordinates(self, distance:float, bearing:float): + def get_coordinates(self, distance: float, bearing: float): # https://stackoverflow.com/questions/7222382/get-lat-long-given-current-point-distance-and-bearing R = 6371.301 - brng = radians(bearing) # Assuming bearing is in degrees + brng = radians(bearing) # Assuming bearing is in degrees d = distance - lat1 = radians(self.latitude) # Current lat point converted to radians + lat1 = radians(self.latitude) # Current lat point converted to radians lon1 = radians(self.longitude) # Current long point converted to radians - lat2 = asin(sin(lat1)*cos(d/R) + - cos(lat1)*sin(d/R)*cos(brng)) - lon2 = lon1 + atan2(sin(brng)*sin(d/R)*cos(lat1), - cos(d/R)-sin(lat1)*sin(lat2)) + lat2 = asin(sin(lat1) * cos(d / R) + cos(lat1) * sin(d / R) * cos(brng)) + lon2 = lon1 + atan2( + sin(brng) * sin(d / R) * cos(lat1), cos(d / R) - sin(lat1) * sin(lat2) + ) return degrees(lat2), degrees(lon2) diff --git a/libs/UDPOut.py b/libs/UDPOut.py deleted file mode 100644 index 249bd07..0000000 --- a/libs/UDPOut.py +++ /dev/null @@ -1,63 +0,0 @@ -import socket - -from numpy import byte - -def sendUDP(HOST,PORT,message): - #sends a message over UDP to a specific host and port - BUFFERSIZE = 1024 - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.connect((HOST, PORT)) - s.sendall(message) - -def sendWheelSpeeds(HOST, PORT, fl,ml,rl,fr,mr,rr): - #sends a udp message containing the six different wheel speeds. - #arguments correspond to front left (fl), middle left (ml), etc. - msg = bytearray(9) - - #start byte is the # character - msg[0] = 0x23 - #byte to tell us what type of message it is. - msg[1] = 0x00 #using hexadecimal since if the code changes, it will likely have to be in hex - - #Add the speeds to the message, while simultaneously summing them for the verification bit - speeds = [fl,ml,rl,fr,mr,rr] - #print(speeds) - speed_fix = [] - for i in speeds: - x = (i/90.0 + 1) * 126 - speed_fix.append(int(x)) - - #print(speed_fix) - cs = 0 - for i in range(6): - msg[i + 2] = speed_fix[i] - cs += speed_fix[i] - - #add verification bit - msg[8] = cs&0xff #capped at 8 binary characters of length - - #print(msg) - #send wheel speeds - sendUDP(HOST,PORT,msg) - -def sendLED(HOST, PORT, color): - red = 0 - green = 0 - blue = 0 - msg = bytearray(5) - msg[0] = 0x23 - msg[1] = 0x02 - if color == 'r': - red = 255 - elif color == 'g': - green = 255 - elif color == 'b': - blue = 255 - msg[2] = red - msg[3] = green - msg[4] = blue - sendUDP(HOST,PORT,msg) - -if __name__ == "__main__": - speed = -90 - sendWheelSpeeds('127.0.0.1', 8080, speed, speed, speed, speed, speed, speed) diff --git a/libs/aruco_tracker.py b/libs/aruco_tracker.py new file mode 100755 index 0000000..abf00e3 --- /dev/null +++ b/libs/aruco_tracker.py @@ -0,0 +1,229 @@ +import configparser +import os +from time import sleep +from typing import List + +import cv2 +import cv2.aruco as aruco +from loguru import logger + + +# TODO(bray): `dataclasses` +class ARTracker: + # TODO: Add type declarations + # TODO: Can we not have our initialization function be over 80 lines? + def __init__( + self, + cameras: List[int], + write: bool = False, + config_file: str = "config.ini", + ) -> None: + """ + Constructs a new ARTracker. + + - `cameras` is a list of OpenCV camera values. + - `write` tells the camera to save video files. + - `config_file` is a path to the config file (relative or absolute). + """ + + self.write = write + self.distance_to_marker = -1 + self.angle_to_marker = -999.9 + self.index1 = -1 + self.index2 = -1 + self.cameras = cameras + + # Open the config file + # TODO: Could this be in a function + config = configparser.ConfigParser(allow_no_value=True) + if not config.read(config_file): + logger.info("ERROR OPENING AR CONFIG:") + if os.path.isabs(config_file): + print(config_file) # print the absolute path + else: + # print the full relative path + print(f"{os.getcwd()}/{config_file}") + exit(-2) + + # Set variables from the config file + self.degrees_per_pixel = float(config["ARTRACKER"]["DEGREES_PER_PIXEL"]) + self.vdegrees_per_pixel = float(config["ARTRACKER"]["VDEGREES_PER_PIXEL"]) + self.focal_length = float(config["ARTRACKER"]["FOCAL_LENGTH"]) + self.focal_length30H = float(config["ARTRACKER"]["FOCAL_LENGTH30H"]) + self.focal_length30V = float(config["ARTRACKER"]["FOCAL_LENGTH30V"]) + self.known_marker_width = float(config["ARTRACKER"]["KNOWN_TAG_WIDTH"]) + self.format = config["ARTRACKER"]["FORMAT"] + self.frame_width = int(config["ARTRACKER"]["FRAME_WIDTH"]) + self.frame_height = int(config["ARTRACKER"]["FRAME_HEIGHT"]) + + # Initialize video writer, fps is set to 5 + if self.write: + self.video_writer = cv2.VideoWriter( + "autonomous.avi", + cv2.VideoWriter_fourcc( + self.format[0], self.format[1], self.format[2], self.format[3] + ), + 5, + (self.frame_width, self.frame_height), + False, + ) + + # Set the ARUCO marker dictionary + self.marker_dict = aruco.Dictionary_get(aruco.DICT_4X4_50) + + # Initialize cameras + # TODO: Could this be in a function + self.caps = [] + if isinstance(self.cameras, int): + self.cameras = [self.cameras] + for i in range(0, len(self.cameras)): + # Makes sure the camera actually connects + while True: + cam = cv2.VideoCapture(self.cameras[i]) + if not cam.isOpened(): + logger.warning(f"!!!!!!!!!!!!Camera {i} did not open!!!!!!!!!!!!!!") + cam.release() + sleep(0.4) # wait a moment to try again! + continue + cam.set(cv2.CAP_PROP_FRAME_HEIGHT, self.frame_height) + cam.set(cv2.CAP_PROP_FRAME_WIDTH, self.frame_width) + + # greatly speeds up the program but the writer is a bit wack because of this + # (it makes the camera store only one frame at a time) + cam.set(cv2.CAP_PROP_BUFFERSIZE, 1) + + # fourcc is the codec used to encode the video (generally) + cam.set( + cv2.CAP_PROP_FOURCC, + cv2.VideoWriter_fourcc( + self.format[0], self.format[1], self.format[2], self.format[3] + ), + ) + # the 0th value of the tuple is a bool, true if we got a frame + # so, in other words: if we didn't read, stop using the camera device + if not cam.read()[0]: + cam.release() + else: # if we did get a frame... + self.caps.append(cam) # add to our list of cameras + break + + # id1 is the main ar tag to telating to id2, since gateposts are no longrack, id2 is not relevant, image is the image to analyze + # TODO: Get rid of anything relating to id2 + # markerFound + def is_marker_found(self, id1: int, image, id2: int = -1) -> bool: + """ + Attempts to find a marker with the given `id1` in the provided `image`. + + Returns true if found. + """ + + found = False + + # converts to grayscale + grayscale = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY, image) + + self.index1 = -1 + self.index2 = -1 + bw = grayscale + # tries converting to b&w using different different cutoffs to find the perfect one for the current lighting + for i in range(40, 221, 60): + # FIXME(bray): use logan's pr man! + bw = cv2.threshold(grayscale, i, 255, cv2.THRESH_BINARY)[1] + (self.corners, self.marker_IDs, self.rejected) = aruco.detectMarkers( + bw, self.marker_dict + ) + if self.marker_IDs is not None: + self.index1 = -1 + # this just checks to make sure that it found the right marker + for m in range(len(self.marker_IDs)): + if self.marker_IDs[m] == id1: + self.index1 = m + break + + if self.index1 != -1: # if we changed the id of the ARUCO tag... + logger.info("Found the correct marker!") + found = True + if self.write: + self.video_writer.write(bw) # purely for debug + # FIXME(bray): this is likely a bug. wait_key()'s delay + # input is in milliseconds! and 1 ms is not very long... + cv2.waitKey(1) + break + + # we need to check if we've found it or not. so... + if self.index1 != -1: + # TODO(teeler): Can we just replace this with cv2.arcuo.estimatePoseSingleMarkers(...) + center_x_marker = ( + self.corners[self.index1][0][0][0] + + self.corners[self.index1][0][1][0] + + self.corners[self.index1][0][2][0] + + self.corners[self.index1][0][3][0] + ) / 4 + # takes the pixels from the marker to the center of the image and multiplies it by the degrees per pixel + # FIXME(bray): wow yeah thanks for the comment, but why? + # FIXME(llar): this is making me cry, is this referencing coordinates on the aruco tag?? using x / y coordinate axis? + self.angle_to_marker = self.degrees_per_pixel * ( + center_x_marker - self.frame_width / 2 + ) + + """ + distanceToAR = (knownWidthOfMarker(20cm) * focalLengthOfCamera) / pixelWidthOfMarker + focal_length = focal length at 0 degrees horizontal and 0 degrees vertical + focal_length30H = focal length at 30 degreees horizontal and 0 degrees vertical + focal_length30V = focal length at 30 degrees vertical and 0 degrees horizontal + realFocalLength of camera = focal_length + + (horizontal angle to marker/30) * (focal_length30H - focal_length) + + (vertical angle to marker / 30) * (focal_length30V - focal_length) + If focal_length30H and focal_length30V both equal focal_length then realFocalLength = focal_length which is good for non huddly cameras + Please note that the realFocalLength calculation is an approximation that could be much better if anyone wants to try to come up with something better + """ + # hangle, vangle = horizontal, vertical angle + hangle_to_marker = abs(self.angle_to_marker) + center_y_marker = ( + self.corners[self.index1][0][0][1] + + self.corners[self.index1][0][1][1] + + self.corners[self.index1][0][2][1] + + self.corners[self.index1][0][3][1] + ) / 4 + vangle_to_marker = abs( + self.vdegrees_per_pixel * (center_y_marker - self.frame_height / 2) + ) + realFocalLength = ( + self.focal_length + + (hangle_to_marker / 30) * (self.focal_length30H - self.focal_length) + + (vangle_to_marker / 30) * (self.focal_length30V - self.focal_length) + ) + width_of_marker = ( + ( + self.corners[self.index1][0][1][0] + - self.corners[self.index1][0][0][0] + ) + + ( + self.corners[self.index1][0][2][0] + - self.corners[self.index1][0][3][0] + ) + ) / 2 + self.distance_to_marker = ( + self.known_marker_width * realFocalLength + ) / width_of_marker + + return found + + def find_marker(self, id1: int, id2: int = -1, cameras: int = -1) -> bool: + """ + This method attempts to find a marker with the given ID. + Returns true if found. + + `id1` is the marker you want to look for. + `cameras` is the number of cameras to check. -1 for all of them + """ + + if cameras == -1: + cameras = len(self.caps) + + for i in range(cameras): + ret, frame = self.caps[i].read() + if self.is_marker_found(id1, frame, id2=id2): + return True + + return False diff --git a/libs/drive.py b/libs/drive.py new file mode 100755 index 0000000..c69f30b --- /dev/null +++ b/libs/drive.py @@ -0,0 +1,282 @@ +from threading import Thread +from threading import Timer +import configparser +import os +import math +from time import sleep +from loguru import logger + +# FIXME(bray): we should be building rovermap into a package. that'd make importing easier +from maps.server import MapServer + +from libs import udp_out +from libs import location +from libs import aruco_tracker + + +class Drive: + def __init__(self, base_speed, cameras): + self.base_speed = base_speed + self.tracker = aruco_tracker.ARTracker(cameras) + + # Starts everything needed by the map + self.map_server = MapServer() + self.map_server.register_routes() + self.map_server.start(debug=False) + self.start_map(self.update_map, 0.5) + sleep(0.1) + + # sets up the parser + config = configparser.ConfigParser(allow_no_value=True) + config.read(os.path.dirname(__file__) + "/../config.ini") + + # parses config + self.mbed_IP = str(config["CONFIG"]["MBED_IP"]) + self.mbed_port = int(config["CONFIG"]["MBED_PORT"]) + swift_IP = str(config["CONFIG"]["SWIFT_IP"]) + swift_port = str(config["CONFIG"]["SWIFT_PORT"]) + self.gps = location.Location(swift_IP, swift_port) + self.gps.start_GPS() + + self.speeds = [0, 0] + self.error_accumulation = 0.0 + + # starts the thread that sends wheel speeds + self.running = True + t = Thread(target=self.send_speed, name=("send wheel speeds"), args=()) + t.daemon = True + t.start() + + # startMap + def start_map(self, func, seconds): + def func_wrapper(): + self.start_map(func, seconds) + func() + + t = Timer(seconds, func_wrapper) + t.start() + return t + + # updateMap + def update_map(self): + self.map_server.update_rover_coords([self.gps.latitude, self.gps.longitude]) + + # Every 100ms, send the current left and right wheel speeds to the mbeds + # sendSpeed + def send_speed(self): + while self.running: + # left and right speed + ls = int(self.speeds[0]) + rs = int(self.speeds[1]) + udp_out.send_wheel_speeds( + self.mbed_IP, self.mbed_port, ls, ls, ls, rs, rs, rs + ) + sleep(0.1) + + # TODO: use the `pid_controller` library instead + def get_speeds(self, base_speed, error, time, kp=0.35, ki=0.000035): + """! Gets the adjusted wheel speed values using a PID controller + + @param speed The base speed for the rover + @param error The current bearing to the designated coordinate + @param time Miliseconds + @param kp P value + @param ki I value + """ + logger.info("Current bearing to objective:", error) + + # values -> speed_values + speed_values = [0, 0] # [Left wheel speeds, right wheel speeds] + min_wheel_speed = -90 # Minimum accepted speed value + max_wheel_speed = 90 # Maximum accepted speed value + + # Updates the error accumulation + self.error_accumulation += error * time + + # Gets the adjusted speed values + speed_values[0] = base_speed + (error * kp) # + self.error_accumulation * ki) + speed_values[1] = base_speed - (error * kp) # + self.error_accumulation * ki) + + # Makes sure the adjusted speed values are within min and max wheel speed values + speed_values[0] = min(speed_values[0], max_wheel_speed) + speed_values[0] = max(speed_values[0], min_wheel_speed) + + speed_values[1] = min(speed_values[1], max_wheel_speed) + speed_values[1] = max(speed_values[1], min_wheel_speed) + + # Make sure the wheels aren't in deadlock + # Because if they're between -10 and 10 the rover won't move + if speed_values[0] < 0 and speed_values[0] > -10: + speed_values[0] = -30 + elif speed_values[0] > 0 and speed_values[0] < 10: + speed_values[0] = 30 + + if speed_values[1] < 0 and speed_values[1] > -10: + speed_values[1] = -30 + elif speed_values[1] > 0 and speed_values[1] < 10: + speed_values[1] = 30 + + return speed_values + + #def left_wheel_speed(self) -> int: + + # Cleaner way to print out the wheel speeds + # printSpeeds + def print_speeds(self): + logger.info(f"Left wheel speed: {self.speeds[0]}") + logger.info(f"Right: wheel speed: {self.speeds[1]}") + + # Drives along a given list of GPS coordinates while looking for the given ar markers + # Keep aruco_id2 at -1 if looking for one post, set aruco_id1 to -1 if you aren't looking for AR markers + # driveAlongCoordinates + def drive_along_coordinates( + # TODO: use below. doesn't work on old ass python (<3.8) + # self, locations: list[Tuple[float, float]], aruco_id1: int, aruco_id2: int = 1 + self, locations, aruco_id1: int, aruco_id2: int = -1 + ) -> bool: + # Starts the GPS + logger.info("starting gps thread...") + self.gps.start_GPS_thread() + logger.info("gps is up!") + + # backs up and turns to avoid running into the last detected sign. Also allows it to get a lock on heading + if aruco_id1 > -1: + logger.info("avoiding last detected marker...") + self.speeds = [-60, -60] # reverse both sides + self.print_speeds() + sleep(2) + self.speeds = [0, 0] # stop for a moment + self.print_speeds() + sleep(2) + self.speeds = [80, 20] # turn right + self.print_speeds() + sleep(4) + else: + # otherwise, we drive forward + logger.info("no previous marker detected. let's drive forward!") + self.speeds = [self.base_speed, self.base_speed] + self.print_speeds() + sleep(3) + + # navigates to each location + logger.info("navigating to each location...") + for lat, long in locations: + distance_checks: int = 0 + self.error_accumulation = 0 + + while self.gps.distance_to(lat, long) > 0.0025: # .0025km + distance_checks += 1 + bearing_to = self.gps.bearing_to(lat, long) + logger.info(f"distance to location ({lat}, {long}): {self.gps.distance_to(lat, long)}") + self.speeds = self.get_speeds( + # change self.base_speed to base_speed + self.base_speed, + bearing_to, + 200, + ) # It will sleep for 100ms + sleep(0.1) # Sleeps for 100ms + self.print_speeds() + logger.info(f"distance checked {distance_checks} times") + + if aruco_id1 != -1 and self.tracker.find_marker(aruco_id1, aruco_id2): + self.gps.stop_GPS_thread() + logger.info("Found Marker!") + self.speeds = [0, 0] + return True + + self.gps.stop_GPS_thread() + logger.warning("Made it to location without seeing marker(s)") + self.speeds = [0, 0] + return False + + def track_AR_Marker(self, aruco_id1: int, aruco_id2: int = -1): + stop_distance = 350 # stops when 250cm from markers TODO make sure rover doesn't stop too far away with huddlys + times_not_found = -1 + self.tracker.find_marker( + aruco_id1, aruco_id2, cameras=1 + ) # Gets and initial angle from the main camera + self.error_accumulation = 0 + + count = 0 + # Pivot the rover until it is in line with the aruco tag + while self.tracker.angle_to_marker > 14 or self.tracker.angle_to_marker < -14: + # If the tag is in the cameras fov, move towards the aruco tag + if self.tracker.find_marker( + aruco_id1, aruco_id2, cameras=1 + ): # Only looking with the center camera right now + if times_not_found == -1: # FIXME: Why are we checking if it's not found if we see it in the camera's fov + self.speeds = [0, 0] + sleep(0.5) + self.speeds = [self.base_speed, self.base_speed] + sleep(0.8) + self.speeds = [0, 0] + else: # Move towards the rover + self.speeds = self.get_speeds(0, self.tracker.angle_to_marker, 100) + logger.info( + f"{self.tracker.angle_to_marker} {self.tracker.distance_to_marker}" + ) + times_not_found = 0 + # Pivot a back and forth to find rover + # FIXME: I think we can delete this + elif times_not_found == -1: + if math.ceil(int(count / 20) / 5) % 2 == 1: + self.speeds = [self.base_speed + 5, -self.base_speed - 5] + else: + self.speeds = [-self.base_speed - 5, self.base_speed + 5] + elif ( + times_not_found < 15 + ): # Lost the tag for less than 1.5 seconds after seeing it with the main camera + times_not_found += 1 + logger.warning(f"lost tag {times_not_found} times") + else: + self.speeds = [0, 0] + logger.warning("lost it") # TODO this is bad + times_not_found = -1 + # return False + self.print_speeds() + sleep(0.1) + count += 1 + self.speeds = [0, 0] + sleep(0.5) + + self.error_accumulation = 0 + print("Locked on and ready to track") + + # Start moving towards the aruco tag + while ( + self.tracker.distance_to_marker > stop_distance + or self.tracker.distance_to_marker == -1 + ): # -1 means we lost the tag + self.tracker.marker_found = self.tracker.find_marker( + aruco_id1, cameras=1 + ) # Looks for the tag + + if self.tracker.distance_to_marker > stop_distance: + self.speeds = self.get_speeds( + self.base_speed - 8, + self.tracker.angle_to_marker, + 100, + kp=0.5, + ki=0.0001, + ) + times_not_found = 0 + print( + f"Tag is {self.tracker.distance_to_marker}cm away at {self.tracker.angle_to_marker} degrees" + ) + + elif self.tracker.distance_to_marker == -1 and times_not_found < 10: + times_not_found += 1 + print(f"lost tag {times_not_found} times") + + elif self.tracker.distance_to_marker == -1: + self.speeds = [0, 0] + print("Lost tag") + return False # TODO this is bad + + self.print_speeds() + sleep(0.1) + + # We scored! + self.speeds = [0, 0] + print("In range of the tag!") + return True diff --git a/libs/Location.py b/libs/location.py old mode 100644 new mode 100755 similarity index 53% rename from libs/Location.py rename to libs/location.py index 7d3a2cc..0372fcf --- a/libs/Location.py +++ b/libs/location.py @@ -1,12 +1,9 @@ -#import sys -#sys.path.append('../') from gps import gps from math import cos, radians, degrees, sin, atan2, pi, sqrt, asin from threading import Thread from time import sleep -import os -os.chdir(os.path.dirname(os.path.abspath(__file__))) + # Class that computes functions related to location of Rover class Location: def __init__(self, ip, port): @@ -22,7 +19,7 @@ def __init__(self, ip, port): self.bearing = 0.0 self.running = True self.all_zero = True - self.wait_time = 1 + self.wait_time = 0.1 def config(self): # read from a file, probably configure this to work with @@ -30,26 +27,33 @@ def config(self): pass # Returns distance in kilometers between given latitude and longitude - def distance_to(self, lat:float, lon:float): + def distance_to(self, lat: float, lon: float): earth_radius = 6371.301 - delta_lat = (lat - self.latitude) * (pi/180.0) - delta_lon = (lon - self.longitude) * (pi/180.0) + delta_lat = (lat - self.latitude) * (pi / 180.0) + delta_lon = (lon - self.longitude) * (pi / 180.0) - a = sin(delta_lat/2) * sin(delta_lat/2) + cos(self.latitude * (pi/180.0)) * cos(lat * (pi/180.0)) * sin(delta_lon/2) * sin(delta_lon/2) - c = 2 * atan2(sqrt(a), sqrt(1-a)) + a = sin(delta_lat / 2) * sin(delta_lat / 2) + cos( + self.latitude * (pi / 180.0) + ) * cos(lat * (pi / 180.0)) * sin(delta_lon / 2) * sin(delta_lon / 2) + c = 2 * atan2(sqrt(a), sqrt(1 - a)) return earth_radius * c # Calculates difference between given bearing to location and current bearing # Positive is turn right, negative is turn left - def bearing_to(self, lat:float, lon:float): - resultbearing = self.calc_bearing(self.latitude, self.longitude, lat, lon) - self.bearing - return resultbearing + 360 if resultbearing < -180 else (resultbearing - 360 if resultbearing > 180 else resultbearing) + def bearing_to(self, lat: float, lon: float): + result_bearing = ( + self.calc_bearing(self.latitude, self.longitude, lat, lon) - self.bearing + ) + return ( + result_bearing + 360 + if result_bearing < -180 + else (result_bearing - 360 if result_bearing > 180 else result_bearing) + ) # Starts updating fields from the GPS box def start_GPS_thread(self): self.running = True - t = Thread(target=self.update_fields_loop, name=( - 'update GPS fields'), args=()) + t = Thread(target=self.update_fields_loop, name=("update GPS fields"), args=()) t.daemon = True t.start() @@ -63,11 +67,10 @@ def start_GPS(self): def stop_GPS(self): self.running = False - gps.gps_finish() - + gps.gps_finish() def update_fields_loop(self): - while(self.running): + while self.running: if gps.get_latitude() + gps.get_longitude() != 0: self.old_latitude = self.latitude self.old_longitude = self.longitude @@ -77,34 +80,42 @@ def update_fields_loop(self): self.height = gps.get_height() self.time = gps.get_time() self.error = gps.get_error() - self.bearing = self.calc_bearing(self.old_latitude, self.old_longitude, self.latitude, self.longitude) + self.bearing = self.calc_bearing( + self.old_latitude, self.old_longitude, self.latitude, self.longitude + ) self.all_zero = False else: - all_zero = True + self.all_zero = True # maybe print info or sleep or something idk sleep(self.wait_time) return - # Calculates bearing between two points. + # Calculates bearing between two points. # (0 is North, 90 is East, +/-180 is South, -90 is West) - def calc_bearing(self,lat1:float, lon1:float, lat2:float, lon2:float): - x = cos(lat2 * (pi/180.0)) * sin((lon2-lon1) * (pi/180.0)) - y = cos(lat1 * (pi/180.0)) * sin(lat2 * (pi/180.0)) - sin(lat1 * (pi/180.0)) * cos(lat2 * (pi/180.0)) * cos((lon2-lon1) * (pi/180.0)) - return (180.0/pi) * atan2(x,y) + def calc_bearing(self, lat1: float, lon1: float, lat2: float, lon2: float): + x = cos(lat2 * (pi / 180.0)) * sin((lon2 - lon1) * (pi / 180.0)) + y = cos(lat1 * (pi / 180.0)) * sin(lat2 * (pi / 180.0)) - sin( + lat1 * (pi / 180.0) + ) * cos(lat2 * (pi / 180.0)) * cos((lon2 - lon1) * (pi / 180.0)) + new_bearing = (180.0 / pi) * atan2(x, y) + # print(self.bearing - newBearing) + # if abs(self.bearing - newBearing) > 80: + # return self.bearing + return new_bearing # Calculate latitutde and longitude given distance (in km) and bearing (in degrees) - def get_coordinates(self, distance:float, bearing:float): + def get_coordinates(self, distance: float, bearing: float): # https://stackoverflow.com/questions/7222382/get-lat-long-given-current-point-distance-and-bearing R = 6371.301 - brng = radians(bearing) # Assuming bearing is in degrees + brng = radians(bearing) # Assuming bearing is in degrees d = distance - lat1 = radians(self.latitude) # Current lat point converted to radians + lat1 = radians(self.latitude) # Current lat point converted to radians lon1 = radians(self.longitude) # Current long point converted to radians - lat2 = asin(sin(lat1)*cos(d/R) + - cos(lat1)*sin(d/R)*cos(brng)) - lon2 = lon1 + atan2(sin(brng)*sin(d/R)*cos(lat1), - cos(d/R)-sin(lat1)*sin(lat2)) + lat2 = asin(sin(lat1) * cos(d / R) + cos(lat1) * sin(d / R) * cos(brng)) + lon2 = lon1 + atan2( + sin(brng) * sin(d / R) * cos(lat1), cos(d / R) - sin(lat1) * sin(lat2) + ) return degrees(lat2), degrees(lon2) diff --git a/libs/udp_out.py b/libs/udp_out.py new file mode 100755 index 0000000..e749033 --- /dev/null +++ b/libs/udp_out.py @@ -0,0 +1,71 @@ +import socket + + +def send_UDP(HOST, PORT, message): + # sends a message over UDP to a specific host and port + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect((HOST, PORT)) + s.sendall(message) + # sendWheelSpeeds + + +def send_wheel_speeds(HOST, PORT, fl, ml, rl, fr, mr, rr): + # sends a udp message containing the six different wheel speeds. + # arguments correspond to front left (fl), middle left (ml), etc. + msg = bytearray(9) + + # start byte is the # character + msg[0] = 0x01 + # byte to tell us what type of message it is. + msg[1] = ( + 0x01 # using hexadecimal since if the code changes, it will likely have to be in hex + ) + + # Add the speeds to the message, while simultaneously summing them for the verification bit + speeds = [fl, ml, rl, fr, mr, rr] + + # Adjust speed + speed_fix = [] + for i in speeds: + x = (i / 90.0 + 1) * 126 + speed_fix.append(int(x)) + + cs = 0 + for i in range(6): + msg[i + 2] = speed_fix[i] + cs += speed_fix[i] + + # add verification bit + msg[8] = cs & 0xFF # capped at 8 binary characters of length + + # send wheel speeds + # sendUDP + send_UDP(HOST, PORT, msg) + # send_LED + + +def send_LED(HOST, PORT, color): + red = 0 + green = 0 + blue = 0 + msg = bytearray(5) + msg[0] = 0x01 + msg[1] = 0x02 + if color == "r": + red = 255 + elif color == "g": + green = 255 + elif color == "b": + blue = 255 + msg[2] = red + msg[3] = green + msg[4] = blue + send_UDP(HOST, PORT, msg) + + +if __name__ == "__main__": + speed = -90 + send_LED( + "192.168.1.", + 1001, + ) diff --git a/main.py b/main.py old mode 100644 new mode 100755 index 27a710c..df128c9 --- a/main.py +++ b/main.py @@ -1,122 +1,163 @@ -import os import argparse -import configparser -from libs import UDPOut -from libs import Drive +from libs import udp_out +from libs.drive import Drive import threading from time import sleep +from typing import Tuple +from loguru import logger +from pathlib import Path +from configparser import ConfigParser -path = os.path.dirname(os.path.abspath(__file__)) -mbedIP = "10.0.0.101" -mbedPort = 1001 +def flash(mbedIP: str, mbedPort: int) -> None: + """ + Flash the LED colors green. -flashing = False - - -def flash(): - while flashing: - UDPOut.sendLED(mbedIP, mbedPort, "g") + This is used to show that the rover successfully navigated to a goal. + """ + + while True: + udp_out.send_LED(mbedIP, mbedPort, "g") sleep(0.2) - UDPOut.sendLED(mbedIP, mbedPort, "o") + udp_out.send_LED(mbedIP, mbedPort, "o") sleep(0.2) -argParser = argparse.ArgumentParser() -argParser.add_argument( - "cameraInput", type=int, help="takes a number representing which camera to use" + +# Create Argument Parser and parse arguments +arg_parser = argparse.ArgumentParser() +arg_parser.add_argument( + "--cameraid", + type=int, + help="takes an opencv number representing which camera to use", + required=True, ) -argParser.add_argument( - "-id", +arg_parser.add_argument( + "--coords", + type=str, + help="a file that has lines of `lat long` separated by spaces", + required=True, +) +arg_parser.add_argument( "--ids", type=int, - help="takes either 1 or 2 id values, defaults to -1 if id not assigned", + help="takes two aruco ids. If you are only looking for one aruco id, the second id should be -1", nargs="+", + required=True, ) -argParser.add_argument( - "-ll", - "--latLong", - type=str, - help="takes a filename for a text file, then reads that file for latlong coordinates", -) -args = argParser.parse_args() +args = arg_parser.parse_args() + + +# TODO: dataclass this function. returning a tuple is ugly and begging for bugs +# TODO: use typing. broken due to old ass python (<3.8) +#def parse_arguments() -> Tuple[list[int], list[Tuple[float, float]]]: +def parse_arguments(): + """ + Parses the command line arguments. + + The returned tuple contains: + - list[int]: the ARUCO ids we'll look for + - list[Tuple[float, float]]: the GPS coordinates we'll drive to + """ + + # TODO: refactor `id_list` to use None/pattern matching + # the ARUCO tag ids that we're looking for + aruco_ids: list[int] = args.ids.copy() + # coords to drive to (tuple: lat, long) + gps_coordinates: list[Tuple[float, float]] = [] + + # make sure we have two ARUCO ids + if len(aruco_ids) != 2: + raise Exception( + "Error while attempting to parse ARUCO IDs. Please enter 2 IDs to search for." + ) + + # handle the list of coordinates + with open(args.coords) as file: + for line in file: + try: + (lat_str, long_str) = line.split(" ") + + # attempt to parse the strings into two numbers + coordinates: Tuple[float, float] = ( + float(lat_str.replace("\ufeff", "")), + float(long_str.replace("\ufeff", "")), + ) + + # if we made it through, add them to the list + gps_coordinates.append(coordinates) + + except ValueError as e: + raise Exception( + f"Coordinate parsing failed on line: {line}. Error: {e}" + ) + + file.close() # important: gotta close the handle + + logger.info(f"Found the following list of coordinates: {gps_coordinates}") + return (aruco_ids, gps_coordinates) # Gets a list of coordinates from user and drives to them and then tracks the tag # Set id1 to -1 if not looking for a tag -def drive(rover): - global flashing - idList = [-1, -1] - locations = [] - - if args.ids is not None: - for i in range(len(args.ids)): - idList[i] = args.ids[i] - - id1 = idList[0] - id2 = idList[1] - - if args.latLong is not None: - with open(args.latLong) as f: - lineNum = 0 - for line in f: - lineNum += 1 - try: - coords = [ - float(item.replace("\ufeff", "")) - for item in line.strip().split() - ] - except ValueError: - print( - "Parse Error on line " - + str(lineNum) - + ": Please enter " - ) - break - else: - if len(coords) != 2: - print( - "Error on line " - + str(lineNum) - + ": Insufficient number of coordinates. Please enter " - ) - break - locations.append(coords) - f.close() - - flashing = False - UDPOut.sendLED(mbedIP, mbedPort, "r") - rover.driveAlongCoordinates(locations, id1, id2) - - if id1 != -1: - rover.trackARMarker(id1, id2) - - flashing = True - lights = threading.Thread(target=flash) - lights.start() - # UDPOut.sendLED(mbedIP, mbedPort, 'g') - - f = open("Recorded_Coordinates_" + args.latLong + ".txt", "a") - f.write("Latitude: " + rover.gps.latitude + "\n Longitude: " + rover.gps.longitude) - f.close() +# TODO: make this a part of a `Rover` class :3 +def drive( + # TODO: use typing. old ass python (<3.8) + #rover: Drive, gps_coordinates: list[Tuple[float, float]], aruco_ids: list[int] + rover: Drive, gps_coordinates, aruco_ids +) -> bool: + """ + Given a Drive object, navigate to goal + return True if we make it the goal and False if we do not + """ -if __name__ == "__main__": - os.chdir(path) - print(os.getcwd()) - # user input (args from system) - if args.cameraInput is None: - print("ERROR: must at least specify one camera") - exit(-1) - - # gets the mbed ip and port - config = configparser.ConfigParser(allow_no_value=True) - if not config.read("config.ini"): - print("DID NOT OPEN CONFIG") - exit(-2) - mbedIP = str(config["CONFIG"]["MBED_IP"]) - mbedPort = int(config["CONFIG"]["MBED_PORT"]) + # Drive along GPS coordinates + logger.warning("ok we're driving along the gps coords :3") + dac_result = rover.drive_along_coordinates(gps_coordinates, aruco_ids[0], aruco_ids[1]) + logger.error(f"drive along coordinates returned {dac_result}") + + + # TODO: Can we have this run in the rover class instead of returning from 'drive_along_coordinates' + if aruco_ids[0] != -1: + rover.track_AR_Marker(aruco_ids[0], aruco_ids[1]) - rover = Drive.Drive(50, args.cameraInput) + return True + # TODO: Return False? If we don't make it? - drive(rover) + +if __name__ == "__main__": + # Try to read from configuration file + config: ConfigParser = ConfigParser(allow_no_value=True) + if not config.read("config.ini"): + raise Exception( + f"Failed to open configuration file! Please ensure `config.ini` exists in this directory ({Path.cwd()})." + ) + + # Get MBED IP and MBED Port + mbed_ip: str = str(config["CONFIG"]["MBED_IP"]) + mbed_port: int = int(config["CONFIG"]["MBED_PORT"]) + + # Read command line arguments + logger.info("parsing command line arugments") + aruco_ids, gps_coordinates = parse_arguments() + + # Initailze REMI!! + logger.info("creating a Drive object (init remi)") + rover: Drive = Drive(50, args.cameraid) + + logger.info("changing led to indicate autonomous mode") + udp_out.send_LED( + mbed_ip, mbed_port, "r" + ) # Change the LED to red to show REMI is entering Autonomous mode + + # Drive to our goal and if we succeed, flash the LEDs green + if drive(rover, gps_coordinates, aruco_ids): + logger.info("Made it to the goal! Flash the LEDs green!") + lights = threading.Thread(target=lambda: flash(mbed_ip, mbed_port)) + lights.start() + n = input("Press enter to end flashing lights") + logger.info("Exiting Program") + exit() + else: + logger.info("!!!YOU FAILED!!!") diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..413804a --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,25 @@ +[project] +name = "autonomous" +version = "0.1.0" +description = "Add your description here" +authors = [{ name = "Barrett", email = "contact@barretts.club" }] +dependencies = [ + "opencv-python>=4.7", + "flask>=3.0.3", + "loguru>=0.7.2", +] +readme = "README.md" +requires-python = ">= 3.6" +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.rye] +managed = true +dev-dependencies = ["mypy>=1.8.0", "ruff>=0.2.0"] + +[tool.hatch.metadata] +allow-direct-references = true + +[tool.hatch.build.targets.wheel] +packages = ["./"]