-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
d959d38
commit 0feca16
Showing
7 changed files
with
346 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
import serial | ||
import time | ||
|
||
arduino = serial.Serial(port='/dev/ttyUSB0', baudrate=115200, timeout=.1) | ||
time_divisor = 10 | ||
|
||
|
||
def write_serial(x): | ||
arduino.write(bytes(x, 'utf-8')) | ||
time.sleep(0.05) | ||
|
||
|
||
def turnRight(angle): | ||
command = "0" + str(angle).zfill(3) + "0000" | ||
write_serial(command) | ||
time.sleep(angle * (time_divisor) / 1000) | ||
|
||
|
||
def turnLeft(angle): | ||
command = "1" + str(angle).zfill(3) + "0000" | ||
write_serial(command) | ||
time.sleep(angle * (time_divisor) / 1000) | ||
|
||
|
||
def goForward(dist): | ||
print('hello') | ||
command = "00001" + str(dist).zfill(3) | ||
print("forward: " + command) | ||
write_serial(command) | ||
time.sleep(dist * (time_divisor) / 1000) | ||
|
||
|
||
def goBackward(dist): | ||
command = "00000" + str(dist).zfill(3) | ||
print("backward: " + command) | ||
write_serial(command) | ||
time.sleep(dist * (time_divisor) / 1000) | ||
|
||
if __name__ == "__main__": | ||
goForward(100) | ||
time.sleep(2) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
import math | ||
|
||
|
||
CAMERA_FOV_RADIANS = (68.61668666 * math.pi / 180, 65.68702522 * math.pi / 180) # horizontal, vertical | ||
CAMERA_DIMENSIONS_PIXELS = (640, 480) | ||
CAMERA_HEIGHT_ABOVE_GROUND_METERS = .084 | ||
|
||
ROBOT_SECONDS_PER_METER = 20 | ||
ROBOT_SECONDS_PER_DEGREE = 0.1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
import numpy as np | ||
import matplotlib.pyplot as plt | ||
import math | ||
import cv2 | ||
from typing import List, Tuple | ||
from config import * | ||
|
||
|
||
def conversion(x: int, c: int, theta: float): | ||
return math.atan2((x - c / 2) * math.tan(theta / 2), c / 2) | ||
|
||
|
||
def get_point_angle(point: Tuple[int, int], camera_dimension: Tuple[int, int], camera_fov: Tuple[float, float]) -> Tuple[float, float]: | ||
return (conversion(point[0], camera_dimension[0], camera_fov[0]), | ||
conversion(point[1], camera_dimension[1], camera_fov[1])) | ||
|
||
|
||
def bounding_box_angle(box: List[Tuple[int, int]], camera_dimension: Tuple[int, int], camera_fov: Tuple[float, float]) -> List[Tuple[float, float]]: | ||
return [get_point_angle(point, camera_dimension, camera_fov) for point in box] | ||
|
||
|
||
def distance_estimation(angles: Tuple[float, float], h: float) -> Tuple[float, float]: | ||
l = h / math.tan(angles[1]) | ||
w = l * math.tan(angles[0]) | ||
return w, l | ||
|
||
|
||
def main(): | ||
res1 = bounding_box_angle([(34, 346)], CAMERA_DIMENSIONS_PIXELS, CAMERA_FOV_RADIANS) | ||
print(res1) | ||
res2 = distance_estimation(res1[0], CAMERA_HEIGHT_ABOVE_GROUND_METERS) | ||
print(res2) | ||
|
||
|
||
|
||
# vid = cv2.VideoCapture(0) | ||
# | ||
# p = (1, 1) | ||
# target = 0.0775 | ||
# differential = 0.0001 | ||
# | ||
# while True: | ||
# ret, frame = vid.read() | ||
# if not ret: | ||
# print("Failed to capture frame") | ||
# break | ||
# | ||
# l, r, m = 0, math.pi, 0 | ||
# | ||
# while (r - l) > differential: | ||
# m = (l + r) / 2 | ||
# | ||
# m_angles = bounding_box_angle( | ||
# [p] * 4, | ||
# CAMERA_DIMENSIONS_PIXELS, (m, CAMERA_FOV_RADIANS[1]))[0] | ||
# | ||
# distances = distance_estimation(m_angles, CAMERA_HEIGHT_ABOVE_GROUND_METERS) | ||
# | ||
# distance = distances[0] | ||
# | ||
# if distance > target: | ||
# r = m - differential | ||
# elif distance < target: | ||
# l = m + differential | ||
# else: | ||
# break | ||
# | ||
# # Convert distance to string for display | ||
# distance_str = "Distance: {:.2f} meters".format(distance) | ||
# | ||
# # Convert pixel measurements to string for display | ||
# pixel_measure_str = "Pixel measurement: {} pixels".format(p) | ||
# | ||
# # Update text on the frame | ||
# cv2.putText(frame, distance_str, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) | ||
# cv2.putText(frame, pixel_measure_str, (20, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) | ||
# | ||
# # Display the frame | ||
# cv2.imshow('Webcam Feed', frame) | ||
# | ||
# # Wait for a small amount of time (1 millisecond) | ||
# # If 'q' is pressed, exit the loop | ||
# if cv2.waitKey(1) & 0xFF == ord('q'): | ||
# break | ||
# | ||
# # Read the next frame | ||
# ret, frame = vid.read() | ||
# | ||
# # Release the video capture object and close all windows | ||
# vid.release() | ||
# cv2.destroyAllWindows() | ||
|
||
|
||
if __name__ == "__main__": | ||
main() | ||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
import os | ||
from enum import IntEnum | ||
import time | ||
import shutil | ||
import math | ||
|
||
from PIL import Image | ||
import cv2 | ||
# import serial | ||
import requests | ||
from distance_measurement import bounding_box_angle, distance_estimation | ||
from config import * | ||
from arduino_wheels import goForward, goBackward, turnLeft, turnRight | ||
|
||
""" | ||
camera | ||
navigation movement | ||
sensors (lidar) | ||
navigation algorithm | ||
e-meet camera | ||
horizontal: 68.61668666 degrees | ||
vertical: 65.68702522 z | ||
at 640x480 | ||
""" | ||
|
||
|
||
class RobotState(IntEnum): | ||
SEEKING_TRASH = 0 | ||
MOVING_TO_TRASH = 1 | ||
AT_TRASH = 2 | ||
|
||
|
||
class TrashTarget: | ||
def __init__(self, horizontal_angle: float, distance: float): | ||
self.horizontal_angle = horizontal_angle | ||
self.distance = distance | ||
|
||
|
||
class RobotStateVariables: | ||
def __init__(self, robot_state: RobotState, trash_target: TrashTarget): | ||
self.robot = Robot() | ||
self.robot_state = robot_state | ||
self.trash_target = trash_target | ||
|
||
|
||
class Robot: | ||
def rotate(self, angle_rad: float): | ||
angle_deg = angle_rad * 180 / math.pi | ||
|
||
# TODO: Handle the case where the seconds needed to turn is greater than the maximum amount of seconds allowed | ||
if angle_deg < 0: # turn left | ||
turnLeft(ROBOT_SECONDS_PER_DEGREE * angle_deg) | ||
else: # turn right | ||
turnRight(ROBOT_SECONDS_PER_DEGREE * angle_deg) | ||
|
||
def move_forward(self, distance_meters: float): | ||
goForward(ROBOT_SECONDS_PER_METER * distance_meters) | ||
|
||
def collect_trash(self): | ||
pass | ||
|
||
|
||
image_count = 0 | ||
YOLO_INFERENCE_SERVER_ADDRESS = os.environ.get('YOLO_INFERENCE_SERVER_ADDRESS') | ||
IMAGE_SAVE_DIR = os.environ.get('IMAGE_SAVE_DIR') | ||
|
||
|
||
def setup(): | ||
|
||
if not os.path.exists(IMAGE_SAVE_DIR): | ||
os.makedirs(IMAGE_SAVE_DIR) | ||
else: | ||
for filename in os.listdir(IMAGE_SAVE_DIR): | ||
file_path = os.path.join(IMAGE_SAVE_DIR, filename) | ||
try: | ||
if os.path.isfile(file_path) or os.path.islink(file_path): | ||
os.unlink(file_path) | ||
elif os.path.isdir(file_path): | ||
shutil.rmtree(file_path) | ||
except Exception as e: | ||
print(f'Failed to delete {file_path}. Reason: {e}') | ||
# make image saving directory if not already | ||
# if it exists already clear it | ||
# check if yolo inference server is running | ||
|
||
pass | ||
|
||
|
||
def read_image(cap: cv2.VideoCapture, save_directory): | ||
global image_count | ||
_, frame = cap.read() | ||
im = Image.fromarray(frame) | ||
save_path = save_directory + '/' + str(image_count) + '.jpg' | ||
image_count += 1 | ||
im.save(save_path) | ||
return save_path | ||
|
||
|
||
def do_yolo_inference(image_path: str) -> dict: | ||
with open(image_path, 'rb') as f: | ||
data = f.read() | ||
|
||
print("Sending image") | ||
r = requests.post(YOLO_INFERENCE_SERVER_ADDRESS, data=data) | ||
resp = r.json() | ||
return resp | ||
|
||
|
||
def do_seeking_trash(robot_state: RobotStateVariables, cap: cv2.VideoCapture): | ||
robot_state.robot_state = RobotState.MOVING_TO_TRASH | ||
robot_state.trash_target = TrashTarget(30, 2) | ||
return | ||
|
||
image_path = read_image(cap, IMAGE_SAVE_DIR) | ||
yolo_results = do_yolo_inference(image_path) | ||
if len(yolo_results) == 0: | ||
return | ||
first_piece_of_trash = yolo_results[0] | ||
box = first_piece_of_trash['box'] | ||
x1, y1, x2, y2 = box['x1'], box['y1'], box['x2'], box['y2'] | ||
p = ((x1 + x2) / 2, y1) | ||
res1 = bounding_box_angle([p], CAMERA_DIMENSIONS_PIXELS, CAMERA_FOV_RADIANS) | ||
res2 = distance_estimation(res1[0], CAMERA_HEIGHT_ABOVE_GROUND_METERS) | ||
|
||
robot_state.trash_target = TrashTarget(res1[0][0], math.sqrt(pow(res2[0], 2) + pow(res2[1], 2))) | ||
|
||
|
||
def do_moving_to_trash(robot_state: RobotStateVariables, cap: cv2.VideoCapture): | ||
if robot_state.trash_target is None: | ||
robot_state.robot_state = RobotState(0) | ||
return | ||
|
||
robot_state.robot.rotate(robot_state.trash_target.horizontal_angle * math.pi / 180) | ||
robot_state.robot.move_forward(robot_state.trash_target.distance) | ||
|
||
|
||
def do_at_trash(robot_state: RobotStateVariables, cap: cv2.VideoCapture): | ||
robot_state.robot.collect_trash() | ||
|
||
|
||
handler_functions = { | ||
0: do_seeking_trash, | ||
1: do_moving_to_trash, | ||
2: do_at_trash, | ||
} | ||
|
||
|
||
def main(): | ||
cap = cv2.VideoCapture('/dev/video0') | ||
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) | ||
state = RobotStateVariables(RobotState(0), None) | ||
setup() | ||
|
||
# main loop | ||
while True: | ||
(handler_functions[state.robot_state])(state, cap) | ||
time.sleep(1) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
import time | ||
|
||
import cv2 | ||
from PIL import Image | ||
|
||
image_count = 0 | ||
SAVE_DIR = "" | ||
|
||
|
||
def read_image(cap: cv2.VideoCapture, save_directory): | ||
global image_count | ||
_, frame = cap.read() | ||
im = Image.fromarray(frame) | ||
save_path = save_directory + '/' + str(image_count) + '.jpg' | ||
image_count += 1 | ||
im.save(save_path) | ||
return save_path | ||
|
||
|
||
def main(): | ||
cap = cv2.VideoCapture('/dev/video0') | ||
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) | ||
|
||
for i in range(10): | ||
read_image(cap, 'images') | ||
print("Sleep") | ||
time.sleep(3) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |