43
43
44
44
"""
45
45
import time
46
+ from micropython import const
46
47
47
48
__version__ = "0.0.0-auto.0"
48
49
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_GPS.git"
49
50
51
+
52
+ _GPSI2C_DEFAULT_ADDRESS = const (0x10 )
53
+
50
54
# Internal helper parsing functions.
51
55
# These handle input that might be none or null and return none instead of
52
56
# throwing errors.
53
-
54
-
55
57
def _parse_degrees (nmea_data ):
56
58
# Parse a NMEA lat/long data pair 'dddmm.mmmm' into a pure degrees value.
57
59
# Where ddd is the degrees, mm.mmmm is the minutes.
@@ -135,11 +137,11 @@ def update(self):
135
137
data_type , args = sentence
136
138
data_type = bytes (data_type .upper (), "ascii" )
137
139
# return sentence
138
- if data_type == b'GPGLL' : # GLL, Geographic Position – Latitude/Longitude
140
+ if data_type in ( b'GPGLL' , b'GNGGL' ) : # GLL, Geographic Position – Latitude/Longitude
139
141
self ._parse_gpgll (args )
140
- elif data_type == b'GPRMC' : # RMC, minimum location info
142
+ elif data_type in ( b'GPRMC' , b'GNRMC' ) : # RMC, minimum location info
141
143
self ._parse_gprmc (args )
142
- elif data_type == b'GPGGA' : # GGA, 3d location fix
144
+ elif data_type in ( b'GPGGA' , b'GNGGA' ) : # GGA, 3d location fix
143
145
self ._parse_gpgga (args )
144
146
return True
145
147
@@ -149,15 +151,15 @@ def send_command(self, command, add_checksum=True):
149
151
Note you should NOT add the leading $ and trailing * to the command
150
152
as they will automatically be added!
151
153
"""
152
- self ._uart . write (b'$' )
153
- self ._uart . write (command )
154
+ self .write (b'$' )
155
+ self .write (command )
154
156
if add_checksum :
155
157
checksum = 0
156
158
for char in command :
157
159
checksum ^= char
158
- self ._uart . write (b'*' )
159
- self ._uart . write (bytes ('{:02x}' .format (checksum ).upper (), "ascii" ))
160
- self ._uart . write (b'\r \n ' )
160
+ self .write (b'*' )
161
+ self .write (bytes ('{:02x}' .format (checksum ).upper (), "ascii" ))
162
+ self .write (b'\r \n ' )
161
163
162
164
@property
163
165
def has_fix (self ):
@@ -181,16 +183,36 @@ def nmea_sentence(self):
181
183
"""Return raw_sentence which is the raw NMEA sentence read from the GPS"""
182
184
return self ._raw_sentence
183
185
186
+ def read (self , num_bytes ):
187
+ """Read up to num_bytes of data from the GPS directly, without parsing.
188
+ Returns a bytearray with up to num_bytes or None if nothing was read"""
189
+ return self ._uart .read (num_bytes )
190
+
191
+ def write (self , bytestr ):
192
+ """Write a bytestring data to the GPS directly, without parsing
193
+ or checksums"""
194
+ return self ._uart .write (bytestr )
195
+
196
+ @property
197
+ def in_waiting (self ):
198
+ """Returns number of bytes available in UART read buffer"""
199
+ return self ._uart .in_waiting
200
+
201
+ def readline (self ):
202
+ """Returns a newline terminated bytearray, must have timeout set for
203
+ the underlying UART or this will block forever!"""
204
+ return self ._uart .readline ()
205
+
184
206
def _read_sentence (self ):
185
207
# Parse any NMEA sentence that is available.
186
208
# pylint: disable=len-as-condition
187
209
# This needs to be refactored when it can be tested.
188
210
189
211
# Only continue if we have at least 32 bytes in the input buffer
190
- if self ._uart . in_waiting < 32 :
212
+ if self .in_waiting < 32 :
191
213
return None
192
214
193
- sentence = self ._uart . readline ()
215
+ sentence = self .readline ()
194
216
if sentence is None or sentence == b'' or len (sentence ) < 1 :
195
217
return None
196
218
try :
@@ -423,3 +445,64 @@ def _parse_gpgsv(self, args):
423
445
except TypeError :
424
446
pass
425
447
self .satellites_prev = self .satellites
448
+
449
+ class GPS_GtopI2C (GPS ):
450
+ """GTop-compatible I2C GPS parsing module. Can parse simple NMEA data
451
+ sentences from an I2C-capable GPS module to read latitude, longitude, and more.
452
+ """
453
+ def __init__ (self , i2c_bus , * , address = _GPSI2C_DEFAULT_ADDRESS , debug = False ,
454
+ timeout = 5 ):
455
+ import adafruit_bus_device .i2c_device as i2c_device
456
+ super ().__init__ (None , debug ) # init the parent with no UART
457
+ self ._i2c = i2c_device .I2CDevice (i2c_bus , address )
458
+ self ._lastbyte = None
459
+ self ._charbuff = bytearray (1 )
460
+ self ._internalbuffer = []
461
+ self ._timeout = timeout
462
+
463
+ def read (self , num_bytes = 1 ):
464
+ """Read up to num_bytes of data from the GPS directly, without parsing.
465
+ Returns a bytearray with up to num_bytes or None if nothing was read"""
466
+ result = []
467
+ for _ in range (num_bytes ):
468
+ with self ._i2c as i2c :
469
+ # we read one byte at a time, verify it isnt part of a string of
470
+ # 'stuffed' newlines and then append to our result array for byteification
471
+ i2c .readinto (self ._charbuff )
472
+ char = self ._charbuff [0 ]
473
+ if (char == ord ('\n ' )) and (self ._lastbyte != ord ('\r ' )):
474
+ continue # skip duplicate \n's!
475
+ result .append (char )
476
+ self ._lastbyte = char # keep track of the last character approved
477
+ return bytearray (result )
478
+
479
+ def write (self , bytestr ):
480
+ """Write a bytestring data to the GPS directly, without parsing
481
+ or checksums"""
482
+ with self ._i2c as i2c :
483
+ i2c .write (bytestr )
484
+
485
+ @property
486
+ def in_waiting (self ):
487
+ """Returns number of bytes available in UART read buffer, always 32
488
+ since I2C does not have the ability to know how much data is available"""
489
+ return 32
490
+
491
+ def readline (self ):
492
+ """Returns a newline terminated bytearray, must have timeout set for
493
+ the underlying UART or this will block forever!"""
494
+ timeout = time .monotonic () + self ._timeout
495
+ while timeout > time .monotonic ():
496
+ # check if our internal buffer has a '\n' termination already
497
+ if self ._internalbuffer and (self ._internalbuffer [- 1 ] == ord ('\n ' )):
498
+ break
499
+ char = self .read (1 )
500
+ if not char :
501
+ continue
502
+ self ._internalbuffer .append (char [0 ])
503
+ #print(bytearray(self._internalbuffer))
504
+ if self ._internalbuffer and self ._internalbuffer [- 1 ] == ord ('\n ' ):
505
+ ret = bytearray (self ._internalbuffer )
506
+ self ._internalbuffer = [] # reset the buffer to empty
507
+ return ret
508
+ return None # no completed data yet
0 commit comments