From 621610f2aff3c276fd12dcb5b41e005bacf6d659 Mon Sep 17 00:00:00 2001 From: zachariah pifer Date: Tue, 25 Apr 2023 12:31:52 -0600 Subject: [PATCH 1/5] first pass at annotations --- adafruit_rockblock.py | 60 ++++++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/adafruit_rockblock.py b/adafruit_rockblock.py index 3cef277..f1708f1 100644 --- a/adafruit_rockblock.py +++ b/adafruit_rockblock.py @@ -24,7 +24,13 @@ https://github.com/adafruit/circuitpython/releases """ - +try: + from typing import Tuple, List, Union, Optional + from board import UART + from serial import Serial + from time import time_struct +except ImportError: + pass import time import struct @@ -36,13 +42,13 @@ class RockBlock: """Driver for RockBLOCK Iridium satellite modem.""" - def __init__(self, uart, baudrate=19200): + def __init__(self, uart: Union[UART, Serial], baudrate: int = 19200) -> None: self._uart = uart self._uart.baudrate = baudrate self._buf_out = None self.reset() - def _uart_xfer(self, cmd): + def _uart_xfer(self, cmd: str) -> Tuple[List[bytes]]: """Send AT command and return response as tuple of lines read.""" self._uart.reset_input_buffer() self._uart.write(str.encode("AT" + cmd + "\r")) @@ -58,22 +64,22 @@ def _uart_xfer(self, cmd): return tuple(resp) - def reset(self): + def reset(self) -> None: """Perform a software reset.""" self._uart_xfer("&F0") # factory defaults self._uart_xfer("&K0") # flow control off - def _transfer_buffer(self): + def _transfer_buffer(self) -> None: """Copy out buffer to in buffer to simulate receiving a message.""" self._uart_xfer("+SBDTC") @property - def data_out(self): + def data_out(self) -> None: """The binary data in the outbound buffer.""" return self._buf_out @data_out.setter - def data_out(self, buf): + def data_out(self, buf: bytes) -> None: if buf is None: # clear the buffer resp = self._uart_xfer("+SBDD0") @@ -100,7 +106,7 @@ def data_out(self, buf): self._buf_out = buf @property - def text_out(self): + def text_out(self) -> Optional[str]: """The text in the outbound buffer.""" text = None # TODO: add better check for non-text in buffer @@ -112,7 +118,7 @@ def text_out(self): return text @text_out.setter - def text_out(self, text): + def text_out(self, text: str) -> None: if not isinstance(text, str): raise ValueError("Only strings allowed.") if len(text) > 120: @@ -120,7 +126,7 @@ def text_out(self, text): self.data_out = str.encode(text) @property - def data_in(self): + def data_in(self) -> Optional[List[bytes]]: """The binary data in the inbound buffer.""" data = None if self.status[2] == 1: @@ -130,7 +136,7 @@ def data_in(self): return data @data_in.setter - def data_in(self, buf): + def data_in(self, buf: bytes) -> None: if buf is not None: raise ValueError("Can only set in buffer to None to clear.") resp = self._uart_xfer("+SBDD1") @@ -139,7 +145,7 @@ def data_in(self, buf): raise RuntimeError("Error clearing buffer.") @property - def text_in(self): + def text_in(self) -> Optional[str]: """The text in the inbound buffer.""" text = None if self.status[2] == 1: @@ -151,10 +157,10 @@ def text_in(self): return text @text_in.setter - def text_in(self, text): + def text_in(self, text: str) -> None: self.data_in = text - def satellite_transfer(self, location=None): + def satellite_transfer(self, location: str = None) -> Tuple[Optional[str], ...]: """Initiate a Short Burst Data transfer with satellites.""" status = (None,) * 6 if location: @@ -170,7 +176,7 @@ def satellite_transfer(self, location=None): return tuple(status) @property - def status(self): + def status(self) -> Tuple[Optional[str], ...]: """Return tuple of Short Burst Data status.""" resp = self._uart_xfer("+SBDSX") if resp[-1].strip().decode() == "OK": @@ -179,7 +185,7 @@ def status(self): return (None,) * 6 @property - def model(self): + def model(self) -> Optional[str]: """Return modem model.""" resp = self._uart_xfer("+GMM") if resp[-1].strip().decode() == "OK": @@ -187,11 +193,11 @@ def model(self): return None @property - def serial_number(self): + def serial_number(self) -> Optional[str]: """Modem's serial number, also known as the modem's IMEI. Returns - string + str | None """ resp = self._uart_xfer("+CGSN") if resp[-1].strip().decode() == "OK": @@ -199,7 +205,7 @@ def serial_number(self): return None @property - def signal_quality(self): + def signal_quality(self) -> int: """Signal Quality also known as the Received Signal Strength Indicator (RSSI). Values returned are 0 to 5, where 0 is no signal (0 bars) and 5 is strong signal (5 bars). @@ -217,7 +223,7 @@ def signal_quality(self): return None @property - def revision(self): + def revision(self) -> Tuple[ Optional[str], ...]: """Modem's internal component firmware revisions. For example: Call Processor Version, Modem DSP Version, DBB Version (ASIC), @@ -237,7 +243,7 @@ def revision(self): return (None,) * 7 @property - def ring_alert(self): + def ring_alert(self) -> Optional[bool]: """The current ring indication mode. False means Ring Alerts are disabled, and True means Ring Alerts are enabled. @@ -255,7 +261,7 @@ def ring_alert(self): return None @ring_alert.setter - def ring_alert(self, value): + def ring_alert(self, value: Union[int, bool]) -> Optional[bool]: if value in (True, False): resp = self._uart_xfer("+SBDMTA=" + str(int(value))) if resp[-1].strip().decode() == "OK": @@ -266,7 +272,7 @@ def ring_alert(self, value): ) @property - def ring_indication(self): + def ring_indication(self) -> Tuple[ Optional[str], ...]: """The ring indication status. Returns the reason for the most recent assertion of the Ring Indicate signal. @@ -294,7 +300,7 @@ def ring_indication(self): return (None,) * 2 @property - def geolocation(self): + def geolocation(self) -> Union[Tuple[int, int, int, time_struct], Tuple[None, None, None, None]]: """Most recent geolocation of the modem as measured by the Iridium constellation including a timestamp of when geolocation measurement was made. @@ -358,7 +364,7 @@ def geolocation(self): return (None,) * 4 @property - def system_time(self): + def system_time(self) -> Optional[time_struct]: """Current date and time as given by the Iridium network. The system time is available and valid only after the ISU has registered with @@ -405,7 +411,7 @@ def system_time(self): return None @property - def energy_monitor(self): + def energy_monitor(self) -> Optional[int]: """The current accumulated energy usage estimate in microamp hours. Returns an estimate of the charge taken from the +5V supply to the modem, @@ -436,7 +442,7 @@ def energy_monitor(self): return None @energy_monitor.setter - def energy_monitor(self, value): + def energy_monitor(self, value: int) -> Optional[int]: if 0 <= value <= 67108863: # 0 to 2^26 - 1 resp = self._uart_xfer("+GEMON=" + str(value)) if resp[-1].strip().decode() == "OK": From 0284d376fbba51e2e41be7d0f3e1a207b10ea67b Mon Sep 17 00:00:00 2001 From: zachariah pifer Date: Tue, 25 Apr 2023 12:35:55 -0600 Subject: [PATCH 2/5] formatting --- adafruit_rockblock.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/adafruit_rockblock.py b/adafruit_rockblock.py index f1708f1..5e24042 100644 --- a/adafruit_rockblock.py +++ b/adafruit_rockblock.py @@ -223,7 +223,7 @@ def signal_quality(self) -> int: return None @property - def revision(self) -> Tuple[ Optional[str], ...]: + def revision(self) -> Tuple[Optional[str], ...]: """Modem's internal component firmware revisions. For example: Call Processor Version, Modem DSP Version, DBB Version (ASIC), @@ -272,7 +272,7 @@ def ring_alert(self, value: Union[int, bool]) -> Optional[bool]: ) @property - def ring_indication(self) -> Tuple[ Optional[str], ...]: + def ring_indication(self) -> Tuple[Optional[str], ...]: """The ring indication status. Returns the reason for the most recent assertion of the Ring Indicate signal. @@ -300,7 +300,9 @@ def ring_indication(self) -> Tuple[ Optional[str], ...]: return (None,) * 2 @property - def geolocation(self) -> Union[Tuple[int, int, int, time_struct], Tuple[None, None, None, None]]: + def geolocation( + self, + ) -> Union[Tuple[int, int, int, time_struct], Tuple[None, None, None, None]]: """Most recent geolocation of the modem as measured by the Iridium constellation including a timestamp of when geolocation measurement was made. From 6907402afec29b107da333004d84d0cbcdec07c6 Mon Sep 17 00:00:00 2001 From: zachariah pifer Date: Tue, 25 Apr 2023 15:15:20 -0600 Subject: [PATCH 3/5] another pass --- adafruit_rockblock.py | 31 +++++++++++++++---------------- requirements.txt | 2 ++ 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/adafruit_rockblock.py b/adafruit_rockblock.py index 5e24042..1b60b79 100644 --- a/adafruit_rockblock.py +++ b/adafruit_rockblock.py @@ -25,10 +25,9 @@ """ try: - from typing import Tuple, List, Union, Optional + from typing import Tuple, Union, Optional from board import UART from serial import Serial - from time import time_struct except ImportError: pass @@ -48,7 +47,7 @@ def __init__(self, uart: Union[UART, Serial], baudrate: int = 19200) -> None: self._buf_out = None self.reset() - def _uart_xfer(self, cmd: str) -> Tuple[List[bytes]]: + def _uart_xfer(self, cmd: str) -> Tuple[bytes, ...]: """Send AT command and return response as tuple of lines read.""" self._uart.reset_input_buffer() self._uart.write(str.encode("AT" + cmd + "\r")) @@ -74,7 +73,7 @@ def _transfer_buffer(self) -> None: self._uart_xfer("+SBDTC") @property - def data_out(self) -> None: + def data_out(self) -> Optional[bytes]: """The binary data in the outbound buffer.""" return self._buf_out @@ -126,7 +125,7 @@ def text_out(self, text: str) -> None: self.data_out = str.encode(text) @property - def data_in(self) -> Optional[List[bytes]]: + def data_in(self) -> Optional[bytes]: """The binary data in the inbound buffer.""" data = None if self.status[2] == 1: @@ -157,10 +156,10 @@ def text_in(self) -> Optional[str]: return text @text_in.setter - def text_in(self, text: str) -> None: + def text_in(self, text: bytes) -> None: self.data_in = text - def satellite_transfer(self, location: str = None) -> Tuple[Optional[str], ...]: + def satellite_transfer(self, location: str = None) -> Tuple[Optional[int], ...]: """Initiate a Short Burst Data transfer with satellites.""" status = (None,) * 6 if location: @@ -176,7 +175,7 @@ def satellite_transfer(self, location: str = None) -> Tuple[Optional[str], ...]: return tuple(status) @property - def status(self) -> Tuple[Optional[str], ...]: + def status(self) -> Tuple[Optional[int], ...]: """Return tuple of Short Burst Data status.""" resp = self._uart_xfer("+SBDSX") if resp[-1].strip().decode() == "OK": @@ -205,7 +204,7 @@ def serial_number(self) -> Optional[str]: return None @property - def signal_quality(self) -> int: + def signal_quality(self) -> Optional[int]: """Signal Quality also known as the Received Signal Strength Indicator (RSSI). Values returned are 0 to 5, where 0 is no signal (0 bars) and 5 is strong signal (5 bars). @@ -302,7 +301,7 @@ def ring_indication(self) -> Tuple[Optional[str], ...]: @property def geolocation( self, - ) -> Union[Tuple[int, int, int, time_struct], Tuple[None, None, None, None]]: + ) -> Union[Tuple[int, int, int, time.struct_time], Tuple[None, None, None, None]]: """Most recent geolocation of the modem as measured by the Iridium constellation including a timestamp of when geolocation measurement was made. @@ -321,7 +320,7 @@ def geolocation( This geolocation coordinate system is known as ECEF (acronym earth-centered, earth-fixed), also known as ECR (initialism for earth-centered rotational) - is a time_struct + is a time.struct_time The timestamp is assigned by the modem when the geolocation grid code received from the network is stored to the modem's internal memory. @@ -329,12 +328,12 @@ def geolocation( 90 millisecond intervals, since Sunday May 11, 2014, at 14:23:55 UTC (the most recent Iridium epoch). The timestamp returned by the modem is a 32-bit integer displayed in hexadecimal form. - We convert the modem's timestamp and return it as a time_struct. + We convert the modem's timestamp and return it as a time.struct_time. The system time value is always expressed in UTC time. Returns a tuple: - (int, int, int, time_struct) + (int, int, int, time.struct_time) """ resp = self._uart_xfer("-MSGEO") if resp[-1].strip().decode() == "OK": @@ -366,7 +365,7 @@ def geolocation( return (None,) * 4 @property - def system_time(self) -> Optional[time_struct]: + def system_time(self) -> Optional[time.struct_time]: """Current date and time as given by the Iridium network. The system time is available and valid only after the ISU has registered with @@ -379,12 +378,12 @@ def system_time(self) -> Optional[time_struct]: 90 millisecond intervals, since Sunday May 11, 2014, at 14:23:55 UTC (the most recent Iridium epoch). The timestamp returned by the modem is a 32-bit integer displayed in hexadecimal form. - We convert the modem's timestamp and return it as a time_struct. + We convert the modem's timestamp and return it as a time.struct_time. The system time value is always expressed in UTC time. Returns: - time_struct + time.struct_time """ resp = self._uart_xfer("-MSSTM") if resp[-1].strip().decode() == "OK": diff --git a/requirements.txt b/requirements.txt index 7a984a4..a355bc7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,5 @@ # SPDX-License-Identifier: Unlicense Adafruit-Blinka +board +serial From 50c712c8aa41a257f7b1a3606294987f125efe4d Mon Sep 17 00:00:00 2001 From: zachariah pifer Date: Tue, 25 Apr 2023 15:49:37 -0600 Subject: [PATCH 4/5] add __future__ annoations --- adafruit_rockblock.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/adafruit_rockblock.py b/adafruit_rockblock.py index 1b60b79..cae3506 100644 --- a/adafruit_rockblock.py +++ b/adafruit_rockblock.py @@ -24,6 +24,8 @@ https://github.com/adafruit/circuitpython/releases """ +from __future__ import annotations + try: from typing import Tuple, Union, Optional from board import UART From 78126affcbd7ac652afaa8dc7d85470dce0d58e3 Mon Sep 17 00:00:00 2001 From: zachariah pifer Date: Thu, 27 Apr 2023 16:50:07 -0600 Subject: [PATCH 5/5] board->busio, remove from requirments, add pyserial to optional_requirements --- adafruit_rockblock.py | 2 +- optional_requirements.txt | 2 ++ requirements.txt | 2 -- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/adafruit_rockblock.py b/adafruit_rockblock.py index cae3506..c2ab5c9 100644 --- a/adafruit_rockblock.py +++ b/adafruit_rockblock.py @@ -28,7 +28,7 @@ try: from typing import Tuple, Union, Optional - from board import UART + from busio import UART from serial import Serial except ImportError: pass diff --git a/optional_requirements.txt b/optional_requirements.txt index d4e27c4..5128ee5 100644 --- a/optional_requirements.txt +++ b/optional_requirements.txt @@ -1,3 +1,5 @@ # SPDX-FileCopyrightText: 2022 Alec Delaney, for Adafruit Industries # # SPDX-License-Identifier: Unlicense + +pyserial diff --git a/requirements.txt b/requirements.txt index a355bc7..7a984a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,5 +3,3 @@ # SPDX-License-Identifier: Unlicense Adafruit-Blinka -board -serial