diff --git a/adafruit_rockblock.py b/adafruit_rockblock.py index 3cef277..c2ab5c9 100644 --- a/adafruit_rockblock.py +++ b/adafruit_rockblock.py @@ -24,7 +24,14 @@ https://github.com/adafruit/circuitpython/releases """ +from __future__ import annotations +try: + from typing import Tuple, Union, Optional + from busio import UART + from serial import Serial +except ImportError: + pass import time import struct @@ -36,13 +43,13 @@ class RockBlock: """Driver for RockBLOCK Iridium satellite modem.""" - def __init__(self, uart, baudrate=19200): + def __init__(self, uart: Union[UART, Serial], baudrate: int = 19200) -> None: self._uart = uart self._uart.baudrate = baudrate self._buf_out = None self.reset() - def _uart_xfer(self, cmd): + def _uart_xfer(self, cmd: str) -> Tuple[bytes, ...]: """Send AT command and return response as tuple of lines read.""" self._uart.reset_input_buffer() self._uart.write(str.encode("AT" + cmd + "\r")) @@ -58,22 +65,22 @@ def _uart_xfer(self, cmd): return tuple(resp) - def reset(self): + def reset(self) -> None: """Perform a software reset.""" self._uart_xfer("&F0") # factory defaults self._uart_xfer("&K0") # flow control off - def _transfer_buffer(self): + def _transfer_buffer(self) -> None: """Copy out buffer to in buffer to simulate receiving a message.""" self._uart_xfer("+SBDTC") @property - def data_out(self): + def data_out(self) -> Optional[bytes]: """The binary data in the outbound buffer.""" return self._buf_out @data_out.setter - def data_out(self, buf): + def data_out(self, buf: bytes) -> None: if buf is None: # clear the buffer resp = self._uart_xfer("+SBDD0") @@ -100,7 +107,7 @@ def data_out(self, buf): self._buf_out = buf @property - def text_out(self): + def text_out(self) -> Optional[str]: """The text in the outbound buffer.""" text = None # TODO: add better check for non-text in buffer @@ -112,7 +119,7 @@ def text_out(self): return text @text_out.setter - def text_out(self, text): + def text_out(self, text: str) -> None: if not isinstance(text, str): raise ValueError("Only strings allowed.") if len(text) > 120: @@ -120,7 +127,7 @@ def text_out(self, text): self.data_out = str.encode(text) @property - def data_in(self): + def data_in(self) -> Optional[bytes]: """The binary data in the inbound buffer.""" data = None if self.status[2] == 1: @@ -130,7 +137,7 @@ def data_in(self): return data @data_in.setter - def data_in(self, buf): + def data_in(self, buf: bytes) -> None: if buf is not None: raise ValueError("Can only set in buffer to None to clear.") resp = self._uart_xfer("+SBDD1") @@ -139,7 +146,7 @@ def data_in(self, buf): raise RuntimeError("Error clearing buffer.") @property - def text_in(self): + def text_in(self) -> Optional[str]: """The text in the inbound buffer.""" text = None if self.status[2] == 1: @@ -151,10 +158,10 @@ def text_in(self): return text @text_in.setter - def text_in(self, text): + def text_in(self, text: bytes) -> None: self.data_in = text - def satellite_transfer(self, location=None): + def satellite_transfer(self, location: str = None) -> Tuple[Optional[int], ...]: """Initiate a Short Burst Data transfer with satellites.""" status = (None,) * 6 if location: @@ -170,7 +177,7 @@ def satellite_transfer(self, location=None): return tuple(status) @property - def status(self): + def status(self) -> Tuple[Optional[int], ...]: """Return tuple of Short Burst Data status.""" resp = self._uart_xfer("+SBDSX") if resp[-1].strip().decode() == "OK": @@ -179,7 +186,7 @@ def status(self): return (None,) * 6 @property - def model(self): + def model(self) -> Optional[str]: """Return modem model.""" resp = self._uart_xfer("+GMM") if resp[-1].strip().decode() == "OK": @@ -187,11 +194,11 @@ def model(self): return None @property - def serial_number(self): + def serial_number(self) -> Optional[str]: """Modem's serial number, also known as the modem's IMEI. Returns - string + str | None """ resp = self._uart_xfer("+CGSN") if resp[-1].strip().decode() == "OK": @@ -199,7 +206,7 @@ def serial_number(self): return None @property - def signal_quality(self): + def signal_quality(self) -> Optional[int]: """Signal Quality also known as the Received Signal Strength Indicator (RSSI). Values returned are 0 to 5, where 0 is no signal (0 bars) and 5 is strong signal (5 bars). @@ -217,7 +224,7 @@ def signal_quality(self): return None @property - def revision(self): + def revision(self) -> Tuple[Optional[str], ...]: """Modem's internal component firmware revisions. For example: Call Processor Version, Modem DSP Version, DBB Version (ASIC), @@ -237,7 +244,7 @@ def revision(self): return (None,) * 7 @property - def ring_alert(self): + def ring_alert(self) -> Optional[bool]: """The current ring indication mode. False means Ring Alerts are disabled, and True means Ring Alerts are enabled. @@ -255,7 +262,7 @@ def ring_alert(self): return None @ring_alert.setter - def ring_alert(self, value): + def ring_alert(self, value: Union[int, bool]) -> Optional[bool]: if value in (True, False): resp = self._uart_xfer("+SBDMTA=" + str(int(value))) if resp[-1].strip().decode() == "OK": @@ -266,7 +273,7 @@ def ring_alert(self, value): ) @property - def ring_indication(self): + def ring_indication(self) -> Tuple[Optional[str], ...]: """The ring indication status. Returns the reason for the most recent assertion of the Ring Indicate signal. @@ -294,7 +301,9 @@ def ring_indication(self): return (None,) * 2 @property - def geolocation(self): + def geolocation( + self, + ) -> Union[Tuple[int, int, int, time.struct_time], Tuple[None, None, None, None]]: """Most recent geolocation of the modem as measured by the Iridium constellation including a timestamp of when geolocation measurement was made. @@ -313,7 +322,7 @@ def geolocation(self): This geolocation coordinate system is known as ECEF (acronym earth-centered, earth-fixed), also known as ECR (initialism for earth-centered rotational) - is a time_struct + is a time.struct_time The timestamp is assigned by the modem when the geolocation grid code received from the network is stored to the modem's internal memory. @@ -321,12 +330,12 @@ def geolocation(self): 90 millisecond intervals, since Sunday May 11, 2014, at 14:23:55 UTC (the most recent Iridium epoch). The timestamp returned by the modem is a 32-bit integer displayed in hexadecimal form. - We convert the modem's timestamp and return it as a time_struct. + We convert the modem's timestamp and return it as a time.struct_time. The system time value is always expressed in UTC time. Returns a tuple: - (int, int, int, time_struct) + (int, int, int, time.struct_time) """ resp = self._uart_xfer("-MSGEO") if resp[-1].strip().decode() == "OK": @@ -358,7 +367,7 @@ def geolocation(self): return (None,) * 4 @property - def system_time(self): + def system_time(self) -> Optional[time.struct_time]: """Current date and time as given by the Iridium network. The system time is available and valid only after the ISU has registered with @@ -371,12 +380,12 @@ def system_time(self): 90 millisecond intervals, since Sunday May 11, 2014, at 14:23:55 UTC (the most recent Iridium epoch). The timestamp returned by the modem is a 32-bit integer displayed in hexadecimal form. - We convert the modem's timestamp and return it as a time_struct. + We convert the modem's timestamp and return it as a time.struct_time. The system time value is always expressed in UTC time. Returns: - time_struct + time.struct_time """ resp = self._uart_xfer("-MSSTM") if resp[-1].strip().decode() == "OK": @@ -405,7 +414,7 @@ def system_time(self): return None @property - def energy_monitor(self): + def energy_monitor(self) -> Optional[int]: """The current accumulated energy usage estimate in microamp hours. Returns an estimate of the charge taken from the +5V supply to the modem, @@ -436,7 +445,7 @@ def energy_monitor(self): return None @energy_monitor.setter - def energy_monitor(self, value): + def energy_monitor(self, value: int) -> Optional[int]: if 0 <= value <= 67108863: # 0 to 2^26 - 1 resp = self._uart_xfer("+GEMON=" + str(value)) if resp[-1].strip().decode() == "OK": diff --git a/optional_requirements.txt b/optional_requirements.txt index d4e27c4..5128ee5 100644 --- a/optional_requirements.txt +++ b/optional_requirements.txt @@ -1,3 +1,5 @@ # SPDX-FileCopyrightText: 2022 Alec Delaney, for Adafruit Industries # # SPDX-License-Identifier: Unlicense + +pyserial