24
24
https://github.com/adafruit/circuitpython/releases
25
25
26
26
"""
27
+ from __future__ import annotations
27
28
29
+ try :
30
+ from typing import Tuple , Union , Optional
31
+ from busio import UART
32
+ from serial import Serial
33
+ except ImportError :
34
+ pass
28
35
29
36
import time
30
37
import struct
36
43
class RockBlock :
37
44
"""Driver for RockBLOCK Iridium satellite modem."""
38
45
39
- def __init__ (self , uart , baudrate = 19200 ):
46
+ def __init__ (self , uart : Union [ UART , Serial ], baudrate : int = 19200 ) -> None :
40
47
self ._uart = uart
41
48
self ._uart .baudrate = baudrate
42
49
self ._buf_out = None
43
50
self .reset ()
44
51
45
- def _uart_xfer (self , cmd ) :
52
+ def _uart_xfer (self , cmd : str ) -> Tuple [ bytes , ...] :
46
53
"""Send AT command and return response as tuple of lines read."""
47
54
self ._uart .reset_input_buffer ()
48
55
self ._uart .write (str .encode ("AT" + cmd + "\r " ))
@@ -58,22 +65,22 @@ def _uart_xfer(self, cmd):
58
65
59
66
return tuple (resp )
60
67
61
- def reset (self ):
68
+ def reset (self ) -> None :
62
69
"""Perform a software reset."""
63
70
self ._uart_xfer ("&F0" ) # factory defaults
64
71
self ._uart_xfer ("&K0" ) # flow control off
65
72
66
- def _transfer_buffer (self ):
73
+ def _transfer_buffer (self ) -> None :
67
74
"""Copy out buffer to in buffer to simulate receiving a message."""
68
75
self ._uart_xfer ("+SBDTC" )
69
76
70
77
@property
71
- def data_out (self ):
78
+ def data_out (self ) -> Optional [ bytes ] :
72
79
"""The binary data in the outbound buffer."""
73
80
return self ._buf_out
74
81
75
82
@data_out .setter
76
- def data_out (self , buf ) :
83
+ def data_out (self , buf : bytes ) -> None :
77
84
if buf is None :
78
85
# clear the buffer
79
86
resp = self ._uart_xfer ("+SBDD0" )
@@ -100,7 +107,7 @@ def data_out(self, buf):
100
107
self ._buf_out = buf
101
108
102
109
@property
103
- def text_out (self ):
110
+ def text_out (self ) -> Optional [ str ] :
104
111
"""The text in the outbound buffer."""
105
112
text = None
106
113
# TODO: add better check for non-text in buffer
@@ -112,15 +119,15 @@ def text_out(self):
112
119
return text
113
120
114
121
@text_out .setter
115
- def text_out (self , text ) :
122
+ def text_out (self , text : str ) -> None :
116
123
if not isinstance (text , str ):
117
124
raise ValueError ("Only strings allowed." )
118
125
if len (text ) > 120 :
119
126
raise ValueError ("Text size limited to 120 bytes." )
120
127
self .data_out = str .encode (text )
121
128
122
129
@property
123
- def data_in (self ):
130
+ def data_in (self ) -> Optional [ bytes ] :
124
131
"""The binary data in the inbound buffer."""
125
132
data = None
126
133
if self .status [2 ] == 1 :
@@ -130,7 +137,7 @@ def data_in(self):
130
137
return data
131
138
132
139
@data_in .setter
133
- def data_in (self , buf ) :
140
+ def data_in (self , buf : bytes ) -> None :
134
141
if buf is not None :
135
142
raise ValueError ("Can only set in buffer to None to clear." )
136
143
resp = self ._uart_xfer ("+SBDD1" )
@@ -139,7 +146,7 @@ def data_in(self, buf):
139
146
raise RuntimeError ("Error clearing buffer." )
140
147
141
148
@property
142
- def text_in (self ):
149
+ def text_in (self ) -> Optional [ str ] :
143
150
"""The text in the inbound buffer."""
144
151
text = None
145
152
if self .status [2 ] == 1 :
@@ -151,10 +158,10 @@ def text_in(self):
151
158
return text
152
159
153
160
@text_in .setter
154
- def text_in (self , text ) :
161
+ def text_in (self , text : bytes ) -> None :
155
162
self .data_in = text
156
163
157
- def satellite_transfer (self , location = None ):
164
+ def satellite_transfer (self , location : str = None ) -> Tuple [ Optional [ int ], ...] :
158
165
"""Initiate a Short Burst Data transfer with satellites."""
159
166
status = (None ,) * 6
160
167
if location :
@@ -170,7 +177,7 @@ def satellite_transfer(self, location=None):
170
177
return tuple (status )
171
178
172
179
@property
173
- def status (self ):
180
+ def status (self ) -> Tuple [ Optional [ int ], ...] :
174
181
"""Return tuple of Short Burst Data status."""
175
182
resp = self ._uart_xfer ("+SBDSX" )
176
183
if resp [- 1 ].strip ().decode () == "OK" :
@@ -179,27 +186,27 @@ def status(self):
179
186
return (None ,) * 6
180
187
181
188
@property
182
- def model (self ):
189
+ def model (self ) -> Optional [ str ] :
183
190
"""Return modem model."""
184
191
resp = self ._uart_xfer ("+GMM" )
185
192
if resp [- 1 ].strip ().decode () == "OK" :
186
193
return resp [1 ].strip ().decode ()
187
194
return None
188
195
189
196
@property
190
- def serial_number (self ):
197
+ def serial_number (self ) -> Optional [ str ] :
191
198
"""Modem's serial number, also known as the modem's IMEI.
192
199
193
200
Returns
194
- string
201
+ str | None
195
202
"""
196
203
resp = self ._uart_xfer ("+CGSN" )
197
204
if resp [- 1 ].strip ().decode () == "OK" :
198
205
return resp [1 ].strip ().decode ()
199
206
return None
200
207
201
208
@property
202
- def signal_quality (self ):
209
+ def signal_quality (self ) -> Optional [ int ] :
203
210
"""Signal Quality also known as the Received Signal Strength Indicator (RSSI).
204
211
205
212
Values returned are 0 to 5, where 0 is no signal (0 bars) and 5 is strong signal (5 bars).
@@ -217,7 +224,7 @@ def signal_quality(self):
217
224
return None
218
225
219
226
@property
220
- def revision (self ):
227
+ def revision (self ) -> Tuple [ Optional [ str ], ...] :
221
228
"""Modem's internal component firmware revisions.
222
229
223
230
For example: Call Processor Version, Modem DSP Version, DBB Version (ASIC),
@@ -237,7 +244,7 @@ def revision(self):
237
244
return (None ,) * 7
238
245
239
246
@property
240
- def ring_alert (self ):
247
+ def ring_alert (self ) -> Optional [ bool ] :
241
248
"""The current ring indication mode.
242
249
243
250
False means Ring Alerts are disabled, and True means Ring Alerts are enabled.
@@ -255,7 +262,7 @@ def ring_alert(self):
255
262
return None
256
263
257
264
@ring_alert .setter
258
- def ring_alert (self , value ) :
265
+ def ring_alert (self , value : Union [ int , bool ]) -> Optional [ bool ] :
259
266
if value in (True , False ):
260
267
resp = self ._uart_xfer ("+SBDMTA=" + str (int (value )))
261
268
if resp [- 1 ].strip ().decode () == "OK" :
@@ -266,7 +273,7 @@ def ring_alert(self, value):
266
273
)
267
274
268
275
@property
269
- def ring_indication (self ):
276
+ def ring_indication (self ) -> Tuple [ Optional [ str ], ...] :
270
277
"""The ring indication status.
271
278
272
279
Returns the reason for the most recent assertion of the Ring Indicate signal.
@@ -294,7 +301,9 @@ def ring_indication(self):
294
301
return (None ,) * 2
295
302
296
303
@property
297
- def geolocation (self ):
304
+ def geolocation (
305
+ self ,
306
+ ) -> Union [Tuple [int , int , int , time .struct_time ], Tuple [None , None , None , None ]]:
298
307
"""Most recent geolocation of the modem as measured by the Iridium constellation
299
308
including a timestamp of when geolocation measurement was made.
300
309
@@ -313,20 +322,20 @@ def geolocation(self):
313
322
This geolocation coordinate system is known as ECEF (acronym earth-centered, earth-fixed),
314
323
also known as ECR (initialism for earth-centered rotational)
315
324
316
- <timestamp> is a time_struct
325
+ <timestamp> is a time.struct_time
317
326
The timestamp is assigned by the modem when the geolocation grid code received from
318
327
the network is stored to the modem's internal memory.
319
328
320
329
The timestamp used by the modem is Iridium system time, which is a running count of
321
330
90 millisecond intervals, since Sunday May 11, 2014, at 14:23:55 UTC (the most recent
322
331
Iridium epoch).
323
332
The timestamp returned by the modem is a 32-bit integer displayed in hexadecimal form.
324
- We convert the modem's timestamp and return it as a time_struct .
333
+ We convert the modem's timestamp and return it as a time.struct_time .
325
334
326
335
The system time value is always expressed in UTC time.
327
336
328
337
Returns a tuple:
329
- (int, int, int, time_struct )
338
+ (int, int, int, time.struct_time )
330
339
"""
331
340
resp = self ._uart_xfer ("-MSGEO" )
332
341
if resp [- 1 ].strip ().decode () == "OK" :
@@ -358,7 +367,7 @@ def geolocation(self):
358
367
return (None ,) * 4
359
368
360
369
@property
361
- def system_time (self ):
370
+ def system_time (self ) -> Optional [ time . struct_time ] :
362
371
"""Current date and time as given by the Iridium network.
363
372
364
373
The system time is available and valid only after the ISU has registered with
@@ -371,12 +380,12 @@ def system_time(self):
371
380
90 millisecond intervals, since Sunday May 11, 2014, at 14:23:55 UTC (the most recent
372
381
Iridium epoch).
373
382
The timestamp returned by the modem is a 32-bit integer displayed in hexadecimal form.
374
- We convert the modem's timestamp and return it as a time_struct .
383
+ We convert the modem's timestamp and return it as a time.struct_time .
375
384
376
385
The system time value is always expressed in UTC time.
377
386
378
387
Returns:
379
- time_struct
388
+ time.struct_time
380
389
"""
381
390
resp = self ._uart_xfer ("-MSSTM" )
382
391
if resp [- 1 ].strip ().decode () == "OK" :
@@ -405,7 +414,7 @@ def system_time(self):
405
414
return None
406
415
407
416
@property
408
- def energy_monitor (self ):
417
+ def energy_monitor (self ) -> Optional [ int ] :
409
418
"""The current accumulated energy usage estimate in microamp hours.
410
419
411
420
Returns an estimate of the charge taken from the +5V supply to the modem,
@@ -436,7 +445,7 @@ def energy_monitor(self):
436
445
return None
437
446
438
447
@energy_monitor .setter
439
- def energy_monitor (self , value ) :
448
+ def energy_monitor (self , value : int ) -> Optional [ int ] :
440
449
if 0 <= value <= 67108863 : # 0 to 2^26 - 1
441
450
resp = self ._uart_xfer ("+GEMON=" + str (value ))
442
451
if resp [- 1 ].strip ().decode () == "OK" :
0 commit comments