-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgps_serial.py
234 lines (223 loc) · 11.2 KB
/
gps_serial.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# The MIT License (MIT)
#
# Copyright (c) 2017 Damien P. George
# Copyright (c) 2019 Brendan Doherty
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""
========================================
gps_serial
========================================
Yet another NMEA sentence parser for serial UART based GPS modules. This implements the threading module for [psuedo] asynchronous applications. CAUTION: The individual satelite info is being ignored until we decide to support capturing it from the GPS module's output.
"""
__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/DVC-Viking-Robotics/GPS_Serial.git"
import time
import threading
from serial import Serial
DEFAULT_LOC = {'lat': 37.96713657090229, 'lng': -122.0712176165581}
"""The default/fallback location to use when waiting for a fix upon power-up
of GPS device. This has been hard-coded to DVC Engineering buildings'
courtyard."""
def _convert2deg(nmea):
"""VERY IMPORTANT needed to go from format 'ddmm.mmmm' into decimal degrees"""
if nmea is None or len(nmea) < 3:
return None
nmea = float(nmea)
return (nmea // 100) + (nmea - ((nmea // 100) * 100)) / 60
class GPSserial:
"""
:param int address: The serial port address that the GPS module is connected to. For example, on the raspberry pi's GPIO pins, this is ``/dev/ttyS0``; on windows, this is something like ``com#`` where # is designated by windows.
:param int timeout: Specific number of seconds till the threading :class:`~serial.Serial`'s `~serial.Serial.read_until()` operation expires. Defaults to 1 second.
:param int baud: The specific baudrate to be used for the serial connection. If left
"""
def __init__(self, address, timeout=1.0, baud=9600):
self._ser = Serial(address=address, baud=baud, timeout=timeout)
# print('Successfully opened port {} @ {} to Arduino device'.format(address, baud))
self._line = self._ser.read_until() # discard any garbage artifacts
self._ser.close()
self._gps_thread = None
# print('Successfully opened port', address, 'to GPS module')
self._lat = DEFAULT_LOC['lat']
self._lng = DEFAULT_LOC['lng']
self._utc = None
self._line = ""
self._speed = {"knots": 0.0, "kmph": 0.0}
self._course = {"true": 0.0, "mag": 0.0}
self._sat = {"connected": 0, "view": 0, "quality": "Fix Unavailable"}
self._altitude = 0.0
# self.azimuth = 0.0
# self.elevation = 0.0
self._data_status = 'Data not valid'
self._fix = "no Fix"
self._rx_status = "unknown"
self._pdop = 0.0
self._hdop = 0.0
self._vdop = 0.0
@property
def lat(self):
"""This attribute holds the latitude coordinate that was most recently parsed from the GPS module's data output."""
return self._lat
@property
def lng(self):
"""This attribute holds the longitude coordinate that was most recently parsed from the GPS module's data output."""
return self._lng
@property
def utc(self):
"""This attribute holds a tuple of time & date data that was most recently parsed from the GPS module's data output. This tuple conforms with python's time module functions."""
return self._utc
@property
def speed_knots(self):
"""This attribute holds the speed (in nautical knots) that was most recently parsed from the GPS module's data output."""
return self._speed['knots']
@property
def speed_kmph(self):
"""This attribute holds the speed (in kilometers per hour) that was most recently parsed from the GPS module's data output."""
return self._speed['kmph']
@property
def sat_connected(self):
"""This attribute holds the number of connected GPS satelites that was most recently parsed from the GPS module's data output."""
return self._sat['connected']
@property
def sat_view(self):
"""This attribute holds the number of GPS satelites in the module's view that was most recently parsed from the GPS module's data output."""
return self._sat['view']
@property
def sat_quality(self):
"""This attribute holds the description of the GPS satelites' quality that was most recently parsed from the GPS module's data output."""
return self._sat['quality']
@property
def course_true(self):
"""This attribute holds the course direction (in terms of "true north") that was most recently parsed from the GPS module's data output."""
return self._course['true']
@property
def course_mag(self):
"""This attribute holds the course direction (in terms of "magnetic north") that was most recently parsed from the GPS module's data output."""
return self._course['mag']
@property
def altitude(self):
"""This attribute holds the GPS antenna's altitude that was most recently parsed from the GPS module's data output."""
return self._altitude
@property
def fix(self):
"""This attribute holds the description of GPS module's fix quality that was most recently parsed from the GPS module's data output."""
return self._fix
@property
def data_status(self):
"""This attribute holds the GPS module's data authenticity that was most recently parsed from the GPS module's data output."""
return self._data_status
@property
def rx_status(self):
"""This attribute holds the GPS module's receiving status that was most recently parsed from the GPS module's data output."""
return self._rx_status
@property
def pdop(self):
"""This attribute holds the GPS module's positional dilution of percision that was most recently parsed from the GPS module's data output."""
return self._pdop
@property
def vdop(self):
"""This attribute holds the GPS module's vertical dilution of percision that was most recently parsed from the GPS module's data output."""
return self._vdop
@property
def hdop(self):
"""This attribute holds the GPS module's horizontal dilution of percision that was most recently parsed from the GPS module's data output."""
return self._hdop
def _parse_line(self, string):
found = False
if string.find('GLL') != -1:
found = True
arr = string.rsplit(',')[1:]
# it would probably be helpful to other location-based APIs to have the
# corrdinates also saved in the original 'DDMM.SS [cardinal direction]'
self._lat = _convert2deg(arr[0])
if arr[1] != 'N' and arr[1] is not None:
self._lat *= -1
self._lng = _convert2deg(arr[2])
if arr[3] != 'E' and arr[3] is not None:
self._lng *= -1.0
type_state = {'A': 'data valid', 'V': 'Data not valid'}
self._data_status = type_state[arr[5]]
elif string.find('VTG') != -1:
arr = string.rsplit(',')[1:]
if len(arr[0]) > 1:
self._course["true"] = float(arr[0])
if len(arr[1]) > 1:
self._course["mag"] = float(arr[1])
if len(arr[2]) > 1:
self._speed["knots"] = float(arr[2])
if len(arr[3]) > 1:
self._speed["kmph"] = float(arr[3])
elif string.find('GGA') != -1:
type_state = [
"Fix Unavailable", "Valid Fix (SPS)", "Valid Fix (GPS)", "unknown1", "unknown2", "unknown3"]
arr = string.rsplit(',')[1:]
self._sat["quality"] = type_state[int(arr[5])]
self._sat["view"] = int(arr[6])
if len(arr[8]) > 1:
self._altitude = float(arr[8])
elif string.find('GSA') != -1:
arr = string.rsplit(',')[1:]
type_fix = ["No Fix", "2D", "3D"]
self._fix = type_fix[int(arr[1]) - 1]
self._pdop = float(arr[14])
self._hdop = float(arr[15])
self._vdop = float(arr[16][:-3])
elif string.find('RMC') != -1:
status = {"V": "Warning", "A": "Valid"}
arr = string.rsplit(',')[1:]
self._rx_status = status[arr[1]]
if len(arr[0]) > 1 and len(arr[8]) > 1:
self._utc = time.struct_time((2000+int(arr[8][4:6]), int(arr[8][2:4]), int(arr[8][0:2]), int(arr[0][0:2]), int(arr[0][2:4]), int(arr[0][4:6]), 0, 0, -1))
elif string.find('GSV') != -1:
arr = string.rsplit(',')[1:]
self._sat['connected'] = arr[0]
# ignoring data specific to individual satelites
# self.elevation = int(arr[4])
# self.azimuth = int(arr[5])
# print('sat["view"]:', self.sat["connected"], 'elevation:', self.elevation, 'Azimuth:', self.azimuth)
return found
def _threaded_read(self, raw):
with self._ser as ser:
found = False
while ser.in_waiting or not found:
self._line = ser.read_until()
try:
self._line = str(self._line, 'ascii').strip()
except UnicodeError:
continue # there was undecernable garbage data that couldn't get encoded to ASCII
if raw:
print(self._line)
# found = true if gps coordinates are captured
found = self._parse_line(self._line)
def get_data(self, raw=False):
"""
This function only starts the process of parsing the data from a GPS module (if any).
:param bool raw: `True` prints the raw data being parsed from the GPS module. `False` doesn't print the raw data. Defaults to `False`.
:returns: the last latitude and longitude coordinates obtained from either object instantiation (`DEFAULT_LOC` values) or previously completed parsing of GPS data.
"""
if self._gps_thread is not None and not self._gps_thread.is_alive():
self._gps_thread.join()
self._gps_thread = threading.Thread(
target=self._threaded_read, args=[raw])
self._gps_thread.start()
elif self._gps_thread is None:
self._gps_thread = threading.Thread(
target=self._threaded_read, args=[raw])
self._gps_thread.start()
return {"lat": self.lat, "lng": self.lng}