-
Notifications
You must be signed in to change notification settings - Fork 0
/
vio.py
108 lines (79 loc) · 3.06 KB
/
vio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from queue import Queue
from threading import Thread
from config import ConfigEuRoC
from image import ImageProcessor
from msckf import MSCKF
class VIO(object):
def __init__(self, config, img_queue, imu_queue, viewer=None):
self.config = config
self.viewer = viewer
self.img_queue = img_queue
self.imu_queue = imu_queue
self.feature_queue = Queue()
self.image_processor = ImageProcessor(config)
self.msckf = MSCKF(config)
self.img_thread = Thread(target=self.process_img)
self.imu_thread = Thread(target=self.process_imu)
self.vio_thread = Thread(target=self.process_feature)
self.img_thread.start()
self.imu_thread.start()
self.vio_thread.start()
def process_img(self):
while True:
img_msg = self.img_queue.get()
if img_msg is None:
self.feature_queue.put(None)
return
# print('img_msg', img_msg.timestamp)
if self.viewer is not None:
self.viewer.update_image(img_msg.cam0_image)
feature_msg = self.image_processor.stareo_callback(img_msg)
if feature_msg is not None:
self.feature_queue.put(feature_msg)
def process_imu(self):
while True:
imu_msg = self.imu_queue.get()
if imu_msg is None:
return
# print('imu_msg', imu_msg.timestamp)
self.image_processor.imu_callback(imu_msg)
self.msckf.imu_callback(imu_msg)
def process_feature(self):
while True:
feature_msg = self.feature_queue.get()
if feature_msg is None:
return
print('feature_msg', feature_msg.timestamp)
result = self.msckf.feature_callback(feature_msg)
if result is not None and self.viewer is not None:
self.viewer.update_pose(result.cam0_pose)
if __name__ == '__main__':
import time
import argparse
from dataset import EuRoCDataset, DataPublisher
from viewer import Viewer
parser = argparse.ArgumentParser()
parser.add_argument('--path', type=str, default='path/to/your/EuRoC_MAV_dataset/MH_01_easy',
help='Path of EuRoC MAV dataset.')
parser.add_argument('--view', action='store_true', help='Show trajectory.')
args = parser.parse_args()
if args.view:
viewer = Viewer()
else:
viewer = None
dataset = EuRoCDataset(args.path)
dataset.set_starttime(offset=40.) # start from static state
img_queue = Queue()
imu_queue = Queue()
# gt_queue = Queue()
config = ConfigEuRoC()
msckf_vio = VIO(config, img_queue, imu_queue, viewer=viewer)
duration = float('inf')
ratio = 0.4 # make it smaller if image processing and MSCKF computation is slow
imu_publisher = DataPublisher(
dataset.imu, imu_queue, duration, ratio)
img_publisher = DataPublisher(
dataset.stereo, img_queue, duration, ratio)
now = time.time()
imu_publisher.start(now)
img_publisher.start(now)