-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathvector_click_agent.py
83 lines (66 loc) · 2.82 KB
/
vector_click_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import argparse
from multiprocessing.connection import Client
import cv2
import environment
import utils
import vector_action_executor
class ClickAgent:
def __init__(self, conn, env, action_executor):
self.conn = conn
self.env = env
self.action_executor = action_executor
self.window_name = 'window'
cv2.namedWindow(self.window_name, cv2.WINDOW_GUI_NORMAL)
#cv2.resizeWindow(self.window_name, 960, 960)
cv2.setMouseCallback(self.window_name, self.mouse_callback)
self.selected_action = None
def mouse_callback(self, event, x, y, *_):
if event == cv2.EVENT_LBUTTONDOWN:
self.selected_action = (y, x)
def run(self):
try:
while True:
# Get new pose estimates
poses = None
while self.conn.poll(): # Discard all but the latest data
poses = self.conn.recv()
if poses is None:
continue
# Update poses in the simulation
self.env.update_poses(poses)
# Visualize state representation
state = self.env.get_state()
cv2.imshow(self.window_name, utils.get_state_visualization(state)[:, :, ::-1])
cv2.waitKey(1)
if self.selected_action is not None:
action = self.selected_action[0] * self.env.get_state_width() + self.selected_action[1]
self.selected_action = None
# Run selected action through simulation
try_action_result = self.env.try_action(action)
# Update action executor
self.action_executor.update_try_action_result(try_action_result)
# Run action executor
self.action_executor.step(poses)
finally:
self.action_executor.disconnect()
def main(args):
# Connect to aruco server for pose estimates
try:
conn = Client(('localhost', 6000), authkey=b'secret password')
except ConnectionRefusedError:
print('Could not connect to aruco server for pose estimates')
return
# Create action executor for the real robot
action_executor = vector_action_executor.VectorActionExecutor(args.robot_index)
# Create pybullet environment to mirror real-world
kwargs = {'num_cubes': args.num_cubes, 'use_gui': True}
cube_indices = list(range(args.num_cubes))
env = environment.RealEnvironment(robot_index=action_executor.robot_index, cube_indices=cube_indices, **kwargs)
env.reset()
# Run the agent
agent = ClickAgent(conn, env, action_executor)
agent.run()
parser = argparse.ArgumentParser()
parser.add_argument('--robot-index', type=int)
parser.add_argument('--num-cubes', type=int, default=10)
main(parser.parse_args())