Skip to content

Commit 621610f

Browse files
first pass at annotations
1 parent e981244 commit 621610f

File tree

1 file changed

+33
-27
lines changed

1 file changed

+33
-27
lines changed

adafruit_rockblock.py

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,13 @@
2424
https://github.com/adafruit/circuitpython/releases
2525
2626
"""
27-
27+
try:
28+
from typing import Tuple, List, Union, Optional
29+
from board import UART
30+
from serial import Serial
31+
from time import time_struct
32+
except ImportError:
33+
pass
2834

2935
import time
3036
import struct
@@ -36,13 +42,13 @@
3642
class RockBlock:
3743
"""Driver for RockBLOCK Iridium satellite modem."""
3844

39-
def __init__(self, uart, baudrate=19200):
45+
def __init__(self, uart: Union[UART, Serial], baudrate: int = 19200) -> None:
4046
self._uart = uart
4147
self._uart.baudrate = baudrate
4248
self._buf_out = None
4349
self.reset()
4450

45-
def _uart_xfer(self, cmd):
51+
def _uart_xfer(self, cmd: str) -> Tuple[List[bytes]]:
4652
"""Send AT command and return response as tuple of lines read."""
4753
self._uart.reset_input_buffer()
4854
self._uart.write(str.encode("AT" + cmd + "\r"))
@@ -58,22 +64,22 @@ def _uart_xfer(self, cmd):
5864

5965
return tuple(resp)
6066

61-
def reset(self):
67+
def reset(self) -> None:
6268
"""Perform a software reset."""
6369
self._uart_xfer("&F0") # factory defaults
6470
self._uart_xfer("&K0") # flow control off
6571

66-
def _transfer_buffer(self):
72+
def _transfer_buffer(self) -> None:
6773
"""Copy out buffer to in buffer to simulate receiving a message."""
6874
self._uart_xfer("+SBDTC")
6975

7076
@property
71-
def data_out(self):
77+
def data_out(self) -> None:
7278
"""The binary data in the outbound buffer."""
7379
return self._buf_out
7480

7581
@data_out.setter
76-
def data_out(self, buf):
82+
def data_out(self, buf: bytes) -> None:
7783
if buf is None:
7884
# clear the buffer
7985
resp = self._uart_xfer("+SBDD0")
@@ -100,7 +106,7 @@ def data_out(self, buf):
100106
self._buf_out = buf
101107

102108
@property
103-
def text_out(self):
109+
def text_out(self) -> Optional[str]:
104110
"""The text in the outbound buffer."""
105111
text = None
106112
# TODO: add better check for non-text in buffer
@@ -112,15 +118,15 @@ def text_out(self):
112118
return text
113119

114120
@text_out.setter
115-
def text_out(self, text):
121+
def text_out(self, text: str) -> None:
116122
if not isinstance(text, str):
117123
raise ValueError("Only strings allowed.")
118124
if len(text) > 120:
119125
raise ValueError("Text size limited to 120 bytes.")
120126
self.data_out = str.encode(text)
121127

122128
@property
123-
def data_in(self):
129+
def data_in(self) -> Optional[List[bytes]]:
124130
"""The binary data in the inbound buffer."""
125131
data = None
126132
if self.status[2] == 1:
@@ -130,7 +136,7 @@ def data_in(self):
130136
return data
131137

132138
@data_in.setter
133-
def data_in(self, buf):
139+
def data_in(self, buf: bytes) -> None:
134140
if buf is not None:
135141
raise ValueError("Can only set in buffer to None to clear.")
136142
resp = self._uart_xfer("+SBDD1")
@@ -139,7 +145,7 @@ def data_in(self, buf):
139145
raise RuntimeError("Error clearing buffer.")
140146

141147
@property
142-
def text_in(self):
148+
def text_in(self) -> Optional[str]:
143149
"""The text in the inbound buffer."""
144150
text = None
145151
if self.status[2] == 1:
@@ -151,10 +157,10 @@ def text_in(self):
151157
return text
152158

153159
@text_in.setter
154-
def text_in(self, text):
160+
def text_in(self, text: str) -> None:
155161
self.data_in = text
156162

157-
def satellite_transfer(self, location=None):
163+
def satellite_transfer(self, location: str = None) -> Tuple[Optional[str], ...]:
158164
"""Initiate a Short Burst Data transfer with satellites."""
159165
status = (None,) * 6
160166
if location:
@@ -170,7 +176,7 @@ def satellite_transfer(self, location=None):
170176
return tuple(status)
171177

172178
@property
173-
def status(self):
179+
def status(self) -> Tuple[Optional[str], ...]:
174180
"""Return tuple of Short Burst Data status."""
175181
resp = self._uart_xfer("+SBDSX")
176182
if resp[-1].strip().decode() == "OK":
@@ -179,27 +185,27 @@ def status(self):
179185
return (None,) * 6
180186

181187
@property
182-
def model(self):
188+
def model(self) -> Optional[str]:
183189
"""Return modem model."""
184190
resp = self._uart_xfer("+GMM")
185191
if resp[-1].strip().decode() == "OK":
186192
return resp[1].strip().decode()
187193
return None
188194

189195
@property
190-
def serial_number(self):
196+
def serial_number(self) -> Optional[str]:
191197
"""Modem's serial number, also known as the modem's IMEI.
192198
193199
Returns
194-
string
200+
str | None
195201
"""
196202
resp = self._uart_xfer("+CGSN")
197203
if resp[-1].strip().decode() == "OK":
198204
return resp[1].strip().decode()
199205
return None
200206

201207
@property
202-
def signal_quality(self):
208+
def signal_quality(self) -> int:
203209
"""Signal Quality also known as the Received Signal Strength Indicator (RSSI).
204210
205211
Values returned are 0 to 5, where 0 is no signal (0 bars) and 5 is strong signal (5 bars).
@@ -217,7 +223,7 @@ def signal_quality(self):
217223
return None
218224

219225
@property
220-
def revision(self):
226+
def revision(self) -> Tuple[ Optional[str], ...]:
221227
"""Modem's internal component firmware revisions.
222228
223229
For example: Call Processor Version, Modem DSP Version, DBB Version (ASIC),
@@ -237,7 +243,7 @@ def revision(self):
237243
return (None,) * 7
238244

239245
@property
240-
def ring_alert(self):
246+
def ring_alert(self) -> Optional[bool]:
241247
"""The current ring indication mode.
242248
243249
False means Ring Alerts are disabled, and True means Ring Alerts are enabled.
@@ -255,7 +261,7 @@ def ring_alert(self):
255261
return None
256262

257263
@ring_alert.setter
258-
def ring_alert(self, value):
264+
def ring_alert(self, value: Union[int, bool]) -> Optional[bool]:
259265
if value in (True, False):
260266
resp = self._uart_xfer("+SBDMTA=" + str(int(value)))
261267
if resp[-1].strip().decode() == "OK":
@@ -266,7 +272,7 @@ def ring_alert(self, value):
266272
)
267273

268274
@property
269-
def ring_indication(self):
275+
def ring_indication(self) -> Tuple[ Optional[str], ...]:
270276
"""The ring indication status.
271277
272278
Returns the reason for the most recent assertion of the Ring Indicate signal.
@@ -294,7 +300,7 @@ def ring_indication(self):
294300
return (None,) * 2
295301

296302
@property
297-
def geolocation(self):
303+
def geolocation(self) -> Union[Tuple[int, int, int, time_struct], Tuple[None, None, None, None]]:
298304
"""Most recent geolocation of the modem as measured by the Iridium constellation
299305
including a timestamp of when geolocation measurement was made.
300306
@@ -358,7 +364,7 @@ def geolocation(self):
358364
return (None,) * 4
359365

360366
@property
361-
def system_time(self):
367+
def system_time(self) -> Optional[time_struct]:
362368
"""Current date and time as given by the Iridium network.
363369
364370
The system time is available and valid only after the ISU has registered with
@@ -405,7 +411,7 @@ def system_time(self):
405411
return None
406412

407413
@property
408-
def energy_monitor(self):
414+
def energy_monitor(self) -> Optional[int]:
409415
"""The current accumulated energy usage estimate in microamp hours.
410416
411417
Returns an estimate of the charge taken from the +5V supply to the modem,
@@ -436,7 +442,7 @@ def energy_monitor(self):
436442
return None
437443

438444
@energy_monitor.setter
439-
def energy_monitor(self, value):
445+
def energy_monitor(self, value: int) -> Optional[int]:
440446
if 0 <= value <= 67108863: # 0 to 2^26 - 1
441447
resp = self._uart_xfer("+GEMON=" + str(value))
442448
if resp[-1].strip().decode() == "OK":

0 commit comments

Comments
 (0)