-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmission_protocol.py
144 lines (105 loc) · 4.1 KB
/
mission_protocol.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""
Implement the MAVLink mission protocol
"""
import os
import time
from pymavlink.dialects.v20 import ardupilotmega as apm2
# Use MAVLink2 wire protocol, must include this before importing pymavlink.mavutil
os.environ['MAVLINK20'] = '1'
from pymavlink import mavutil, mavwp
def upload_using_mission_protocol(conn, mission_type, items) -> bool:
start = time.time()
# Start mission protocol by sending the count of items
conn.mav.mission_count_send(1, 1, len(items), mission_type)
remaining_to_send = set(range(0, len(items)))
sent = set()
timeout = (10 + len(items) / 10.0)
while True:
if time.time() - start > timeout:
print('Timeout uploading mission')
return False
if len(remaining_to_send) == 0:
print("All sent, waiting for MISSION_ACK")
break
# Wait for a MISSION_REQUEST
m = conn.recv_match(type=['MISSION_REQUEST', 'MISSION_ACK'], blocking=True, timeout=1)
if m is None:
continue
if m.get_type() == 'MISSION_ACK':
if m.target_system == 255 and m.target_component == 0 and m.type == 1 and m.mission_type == 0:
print('MAVProxy is messing with us')
continue
else:
print(f'Unexpected MISSION_ACK {str(m)}')
return False
print(f'Item {m.seq}/{len(items) - 1} requested')
print(f'Getting ready to send ({str(items[m.seq])})')
if m.seq in sent:
print('Duplicate request, continue')
continue
if m.seq not in remaining_to_send:
print('Item already sent? Or we never had it?')
return False
if m.mission_type != mission_type:
print('Request has wrong mission type')
return False
if items[m.seq].mission_type != mission_type:
print('Input has wrong mission type')
return False
if items[m.seq].target_system != 1:
print('Input has wrong target system')
return False
if items[m.seq].target_component != 1:
print('Input has wrong target component')
return False
if items[m.seq].seq != m.seq:
print(f'Input has wrong sequence number ({items[m.seq].seq} vs {m.seq})')
return False
# Pack and send item
items[m.seq].pack(conn.mav)
conn.mav.send(items[m.seq])
remaining_to_send.discard(m.seq)
sent.add(m.seq)
timeout += 10 # we received a good request for item; be generous with our timeouts
m = conn.recv_match(type='MISSION_ACK', blocking=True, timeout=1)
if m is None:
print('Timeout waiting for MISSION_ACK')
return False
if m.mission_type != mission_type:
print("MISSION_ACK has wrong mission_type")
return False
if m.type != mavutil.mavlink.MAV_MISSION_ACCEPTED:
print(f'Mission upload failed {mavutil.mavlink.enums["MAV_MISSION_RESULT"][m.type].name}')
return False
print(f'Upload of all {len(items)} items succeeded')
return True
def wp_to_mission_item_int(wp):
"""Convert a MISSION_ITEM to a MISSION_ITEM_INT"""
if wp.get_type() == 'MISSION_ITEM_INT':
return wp
wp_int = mavutil.mavlink.MAVLink_mission_item_int_message(
wp.target_system,
wp.target_component,
wp.seq,
wp.frame,
wp.command,
wp.current,
wp.autocontinue,
wp.param1,
wp.param2,
wp.param3,
wp.param4,
int(wp.x * 1.0e7),
int(wp.y * 1.0e7),
wp.z)
return wp_int
def mission_from_path(path):
# mavwp can read the "mission plain-text file format" described here:
# https://mavlink.io/en/file_formats/
waypoint_loader = mavwp.MAVWPLoader(target_system=1, target_component=1)
waypoint_loader.load(path)
return [wp_to_mission_item_int(x) for x in waypoint_loader.wpoints]
# TODO take SimClock
def upload_mission(conn, path) -> bool:
waypoints = mission_from_path(path)
return upload_using_mission_protocol(conn, apm2.MAV_MISSION_TYPE_MISSION, waypoints)